diff --git a/.github/workflows/spark_master_python_test.yaml b/.github/workflows/spark_master_python_test.yaml index 78092037f90..f5cfbb715f5 100644 --- a/.github/workflows/spark_master_python_test.yaml +++ b/.github/workflows/spark_master_python_test.yaml @@ -65,6 +65,7 @@ jobs: pipenv run pip install pip==24.0 setuptools==69.5.1 wheel==0.43.0 pipenv run pip install flake8==3.9.0 pipenv run pip install black==23.12.1 + pipenv run pip install importlib_metadata==3.10.0 pipenv run pip install mypy==1.8.0 pipenv run pip install mypy-protobuf==3.3.0 pipenv run pip install cryptography==37.0.4 @@ -74,8 +75,15 @@ jobs: pipenv run pip install pydocstyle==3.0.0 pipenv run pip install pandas==2.2.0 pipenv run pip install pyarrow==11.0.0 - pipenv run pip install numpy==1.21 - pipenv run pip install https://dist.apache.org/repos/dist/dev/spark/v4.0.0-rc4-bin/pyspark-4.0.0.tar.gz + pipenv run pip install pypandoc==1.3.3 + pipenv run pip install numpy==1.22.4 + pipenv run pip install grpcio==1.67.0 + pipenv run pip install grpcio-status==1.67.0 + pipenv run pip install googleapis-common-protos==1.65.0 + pipenv run pip install protobuf==5.29.1 + pipenv run pip install googleapis-common-protos-stubs==2.2.0 + pipenv run pip install grpc-stubs==1.24.11 + pipenv run pip install https://dist.apache.org/repos/dist/dev/spark/v4.0.0-rc6-bin/pyspark-4.0.0.tar.gz if: steps.git-diff.outputs.diff - name: Run Python tests # when changing TEST_PARALLELISM_COUNT make sure to also change it in spark_master_test.yaml diff --git a/dev/delta-connect-gen-protos.sh b/dev/delta-connect-gen-protos.sh index 6bea75aa6df..790b2aa30bd 100755 --- a/dev/delta-connect-gen-protos.sh +++ b/dev/delta-connect-gen-protos.sh @@ -71,8 +71,10 @@ for f in `find gen/proto/python/delta/connect -name "*.py*"`; do if [[ $f == *_pb2.py || $f == *_pb2_grpc.py ]]; then sed \ -e 's/import spark.connect./import pyspark.sql.connect.proto./g' \ - -e 's/from delta.connect import/from delta.connect.proto import/g' \ + -e "s/DESCRIPTOR, 'spark.connect/DESCRIPTOR, 'pyspark.sql.connect.proto/g" \ + -e 's/from spark.connect import/from pyspark.sql.connect.proto import/g' \ -e "s/DESCRIPTOR, 'delta.connect/DESCRIPTOR, 'delta.connect.proto/g" \ + -e 's/from delta.connect import/from delta.connect.proto import/g' \ $f > $f.tmp mv $f.tmp $f elif [[ $f == *.pyi ]]; then diff --git a/dev/requirements.txt b/dev/requirements.txt index e61e99bfef6..382f9df9f0d 100644 --- a/dev/requirements.txt +++ b/dev/requirements.txt @@ -3,7 +3,12 @@ mypy==1.8.0 flake8==3.9.0 # Code Formatter -black==23.9.1 +black==23.12.1 + +# Spark Connect (required) +grpcio>=1.67.0 +grpcio-status>=1.67.0 +googleapis-common-protos>=1.65.0 # Spark and Delta Connect python proto generation plugin (optional) mypy-protobuf==3.3.0 diff --git a/examples/python/delta_connect.py b/examples/python/delta_connect.py new file mode 100644 index 00000000000..507989bce43 --- /dev/null +++ b/examples/python/delta_connect.py @@ -0,0 +1,100 @@ +# +# Copyright (2024) The Delta Lake Project Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +To run this example you must follow these steps: + +Requirements: +- Using Java 17 +- Spark 4.0.0-preview1+ +- delta-spark (python package) 4.0.0rc1+ and pyspark 4.0.0.dev1+ + +(1) Start a local Spark connect server using this command: +sbin/start-connect-server.sh \ + --packages org.apache.spark:spark-connect_2.13:4.0.0-preview1,io.delta:delta-connect-server_2.13:{DELTA_VERSION},io.delta:delta-spark_2.13:{DELTA_VERSION},com.google.protobuf:protobuf-java:3.25.1 \ + --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ + --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \ + --conf "spark.connect.extensions.relation.classes"="org.apache.spark.sql.connect.delta.DeltaRelationPlugin" \ + --conf "spark.connect.extensions.command.classes"="org.apache.spark.sql.connect.delta.DeltaCommandPlugin" +* Be sure to replace DELTA_VERSION with the version you are using + +(2) Set the SPARK_REMOTE environment variable to point to your local Spark server +export SPARK_REMOTE="sc://localhost:15002" + +(3) Run this file i.e. python3 examples/python/delta_connect.py +""" + +import os +from pyspark.sql import SparkSession +from delta.tables import DeltaTable +import shutil + +filePath = "/tmp/delta_connect" +tableName = "delta_connect_table" + +def assert_dataframe_equals(df1, df2): + assert(df1.collect().sort() == df2.collect().sort()) + +def cleanup(spark): + shutil.rmtree(filePath, ignore_errors=True) + spark.sql(f"DROP TABLE IF EXISTS {tableName}") + +# --------------------- Set up Spark Connect spark session ------------------------ + +assert os.getenv("SPARK_REMOTE"), "Must point to Spark Connect server using SPARK_REMOTE" + +spark = SparkSession.builder \ + .appName("delta_connect") \ + .remote(os.getenv("SPARK_REMOTE")) \ + .getOrCreate() + +# Clean up any previous runs +cleanup(spark) + +# -------------- Try reading non-existent table (should fail with an exception) ---------------- + +# Using forPath +try: + DeltaTable.forPath(spark, filePath).toDF().show() +except Exception as e: + assert "DELTA_MISSING_DELTA_TABLE" in str(e) +else: + assert False, "Expected exception to be thrown for missing table" + +# Using forName +try: + DeltaTable.forName(spark, tableName).toDF().show() +except Exception as e: + assert "DELTA_MISSING_DELTA_TABLE" in str(e) +else: + assert False, "Expected exception to be thrown for missing table" + +# ------------------------ Write basic table and check that results match ---------------------- + +# By table name +spark.range(5).write.format("delta").saveAsTable(tableName) +assert_dataframe_equals(DeltaTable.forName(spark, tableName).toDF(), spark.range(5)) +assert_dataframe_equals(spark.read.format("delta").table(tableName), spark.range(5)) +assert_dataframe_equals(spark.sql(f"SELECT * FROM {tableName}"), spark.range(5)) + +# By table path +spark.range(10).write.format("delta").save(filePath) +assert_dataframe_equals(DeltaTable.forPath(spark, filePath).toDF(), spark.range(10)) +assert_dataframe_equals(spark.read.format("delta").load(filePath), spark.range(10)) +assert_dataframe_equals(spark.sql(f"SELECT * FROM delta.`{filePath}`"), spark.range(10)) + +# ---------------------------------- Clean up ---------------------------------------- +cleanup(spark) diff --git a/python/delta/connect/__init__.py b/python/delta/connect/__init__.py new file mode 100644 index 00000000000..50efb4c5de2 --- /dev/null +++ b/python/delta/connect/__init__.py @@ -0,0 +1,19 @@ +# +# Copyright (2024) The Delta Lake Project Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from delta.connect.tables import DeltaTable + +__all__ = ['DeltaTable'] diff --git a/python/delta/connect/plan.py b/python/delta/connect/plan.py new file mode 100644 index 00000000000..5aafbf03d04 --- /dev/null +++ b/python/delta/connect/plan.py @@ -0,0 +1,55 @@ +# +# Copyright (2024) The Delta Lake Project Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Optional + +import delta.connect.proto as proto + +from pyspark.sql.connect.client import SparkConnectClient +from pyspark.sql.connect.plan import LogicalPlan +import pyspark.sql.connect.proto as spark_proto + + +class DeltaLogicalPlan(LogicalPlan): + def __init__(self, child: Optional[LogicalPlan]) -> None: + super().__init__(child) + + def plan(self, session: SparkConnectClient) -> spark_proto.Relation: + plan = spark_proto.Relation() + plan.extension.Pack(self.to_delta_relation(session)) + return plan + + def to_delta_relation(self, session: SparkConnectClient) -> proto.DeltaRelation: + ... + + def command(self, session: SparkConnectClient) -> spark_proto.Command: + command = spark_proto.Command() + command.extension.Pack(self.to_delta_command(session)) + return command + + def to_delta_command(self, session: SparkConnectClient) -> proto.DeltaCommand: + ... + + +class DeltaScan(DeltaLogicalPlan): + def __init__(self, table: proto.DeltaTable) -> None: + super().__init__(None) + self._table = table + + def to_delta_relation(self, client: SparkConnectClient) -> proto.DeltaRelation: + relation = proto.DeltaRelation() + relation.scan.table.CopyFrom(self._table) + return relation diff --git a/python/delta/connect/proto/__init__.py b/python/delta/connect/proto/__init__.py new file mode 100644 index 00000000000..f0fa634f5fc --- /dev/null +++ b/python/delta/connect/proto/__init__.py @@ -0,0 +1,19 @@ +# +# Copyright (2024) The Delta Lake Project Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from delta.connect.proto.base_pb2 import * +from delta.connect.proto.commands_pb2 import * +from delta.connect.proto.relations_pb2 import * diff --git a/python/delta/connect/proto/relations_pb2.py b/python/delta/connect/proto/relations_pb2.py index 60e9c470a9a..59936ad51dc 100644 --- a/python/delta/connect/proto/relations_pb2.py +++ b/python/delta/connect/proto/relations_pb2.py @@ -28,9 +28,9 @@ from delta.connect.proto import base_pb2 as delta_dot_connect_dot_base__pb2 -from spark.connect import expressions_pb2 as spark_dot_connect_dot_expressions__pb2 -from spark.connect import relations_pb2 as spark_dot_connect_dot_relations__pb2 -from spark.connect import types_pb2 as spark_dot_connect_dot_types__pb2 +from pyspark.sql.connect.proto import expressions_pb2 as spark_dot_connect_dot_expressions__pb2 +from pyspark.sql.connect.proto import relations_pb2 as spark_dot_connect_dot_relations__pb2 +from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__pb2 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( diff --git a/python/delta/connect/tables.py b/python/delta/connect/tables.py new file mode 100644 index 00000000000..bf6b7d20ebd --- /dev/null +++ b/python/delta/connect/tables.py @@ -0,0 +1,88 @@ +# +# Copyright (2024) The Delta Lake Project Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Dict, Optional + +from delta.connect.plan import DeltaScan +import delta.connect.proto as proto +from delta.tables import DeltaTable as LocalDeltaTable + +from pyspark.sql.connect.dataframe import DataFrame +from pyspark.sql.connect.plan import LogicalPlan, SubqueryAlias +from pyspark.sql.connect.session import SparkSession + + +class DeltaTable(object): + def __init__( + self, + spark: SparkSession, + path: Optional[str] = None, + tableOrViewName: Optional[str] = None, + hadoopConf: Dict[str, str] = dict(), + plan: Optional[LogicalPlan] = None + ) -> None: + self._spark = spark + self._path = path + self._tableOrViewName = tableOrViewName + self._hadoopConf = hadoopConf + if plan is not None: + self._plan = plan + else: + self._plan = DeltaScan(self._to_proto()) + + def toDF(self) -> DataFrame: + return DataFrame(self._plan, session=self._spark) + + def alias(self, aliasName: str) -> "DeltaTable": + return DeltaTable( + self._spark, + self._path, + self._tableOrViewName, + self._hadoopConf, + SubqueryAlias(self._plan, aliasName) + ) + + @classmethod + def forPath( + cls, + sparkSession: SparkSession, + path: str, + hadoopConf: Dict[str, str] = dict() + ) -> "DeltaTable": + assert sparkSession is not None + return DeltaTable(sparkSession, path=path, hadoopConf=hadoopConf) + + @classmethod + def forName( + cls, sparkSession: SparkSession, tableOrViewName: str + ) -> "DeltaTable": + assert sparkSession is not None + return DeltaTable(sparkSession, tableOrViewName=tableOrViewName) + + def _to_proto(self) -> proto.DeltaTable: + result = proto.DeltaTable() + if self._path is not None: + result.path.path = self._path + if self._tableOrViewName is not None: + result.table_or_view_name = self._tableOrViewName + return result + + +DeltaTable.__doc__ = LocalDeltaTable.__doc__ +DeltaTable.toDF.__doc__ = LocalDeltaTable.toDF.__doc__ +DeltaTable.alias.__doc__ = LocalDeltaTable.alias.__doc__ +DeltaTable.forPath.__func__.__doc__ = LocalDeltaTable.forPath.__doc__ +DeltaTable.forName.__func__.__doc__ = LocalDeltaTable.forName.__doc__ diff --git a/python/delta/connect/testing/__init__.py b/python/delta/connect/testing/__init__.py new file mode 100644 index 00000000000..46b3e9c7d5c --- /dev/null +++ b/python/delta/connect/testing/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright (2024) The Delta Lake Project Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/delta/connect/testing/utils.py b/python/delta/connect/testing/utils.py new file mode 100644 index 00000000000..9b75947f77a --- /dev/null +++ b/python/delta/connect/testing/utils.py @@ -0,0 +1,58 @@ +# +# Copyright (2024) The Delta Lake Project Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import tempfile +import shutil +import os + +from pyspark import SparkConf +from pyspark.testing.connectutils import ReusedConnectTestCase + + +class DeltaTestCase(ReusedConnectTestCase): + """ + Test suite base for setting up a properly configured SparkSession for using Delta Connect. + """ + + @classmethod + def setUpClass(cls) -> None: + # Spark Connect will set SPARK_CONNECT_TESTING_REMOTE, and it does not allow MASTER + # to be set simultaneously, so we need to clear it. + # TODO(long.vu): Find a cleaner way to clear "MASTER". + if "MASTER" in os.environ: + del os.environ["MASTER"] + super(DeltaTestCase, cls).setUpClass() + + @classmethod + def conf(cls) -> SparkConf: + _conf = super(DeltaTestCase, cls).conf() + _conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + _conf.set("spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.delta.catalog.DeltaCatalog") + _conf.set("spark.connect.extensions.relation.classes", + "org.apache.spark.sql.connect.delta.DeltaRelationPlugin") + _conf.set("spark.connect.extensions.command.classes", + "org.apache.spark.sql.connect.delta.DeltaCommandPlugin") + return _conf + + def setUp(self) -> None: + super(DeltaTestCase, self).setUp() + self.tempPath = tempfile.mkdtemp() + self.tempFile = os.path.join(self.tempPath, "tempFile") + + def tearDown(self) -> None: + super(DeltaTestCase, self).tearDown() + shutil.rmtree(self.tempPath) diff --git a/python/delta/connect/tests/__init__.py b/python/delta/connect/tests/__init__.py new file mode 100644 index 00000000000..46b3e9c7d5c --- /dev/null +++ b/python/delta/connect/tests/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright (2024) The Delta Lake Project Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/delta/connect/tests/test_deltatable.py b/python/delta/connect/tests/test_deltatable.py new file mode 100644 index 00000000000..afc18945cac --- /dev/null +++ b/python/delta/connect/tests/test_deltatable.py @@ -0,0 +1,178 @@ +# +# Copyright (2024) The Delta Lake Project Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import unittest +import sys + +from delta.connect.testing.utils import DeltaTestCase + +path_to_delta_connect_tests_folder = os.path.dirname(os.path.abspath(__file__)) +path_to_delta_folder = os.path.dirname(os.path.dirname(path_to_delta_connect_tests_folder)) +sys.path.append(path_to_delta_folder) + +from tests.test_deltatable import DeltaTableTestsMixin + + +class DeltaTableTests(DeltaTableTestsMixin, DeltaTestCase): + @unittest.skip("delete has not been implemented yet") + def test_delete(self): + pass + + @unittest.skip("generate has not been implemented yet") + def test_generate(self): + pass + + @unittest.skip("update has not been implemented yet") + def test_update(self): + pass + + @unittest.skip("merge has not been implemented yet") + def test_merge(self): + pass + + @unittest.skip("merge has not been implemented yet") + def test_merge_with_inconsistent_sessions(self): + pass + + @unittest.skip("history has not been implemented yet") + def test_history(self): + pass + + @unittest.skip("cdc has not been implemented yet") + def test_cdc(self): + pass + + @unittest.skip("detail has not been implemented yet") + def test_detail(self): + pass + + @unittest.skip("vacuum has not been implemented yet") + def test_vacuum(self): + pass + + @unittest.skip("convertToDelta has not been implemented yet") + def test_convertToDelta(self): + pass + + @unittest.skip("isDeltaTable has not been implemented yet") + def test_isDeltaTable(self): + pass + + @unittest.skip("create has not been implemented yet") + def test_create_table_with_existing_schema(self): + pass + + @unittest.skip("createOrReplace has not been implemented yet") + def test_create_replace_table_with_cluster_by(self): + pass + + @unittest.skip("createOrReplace has not been implemented yet") + def test_create_replace_table_with_no_spark_session_passed(self): + pass + + @unittest.skip("create has not been implemented yet") + def test_create_table_with_name_only(self): + pass + + @unittest.skip("create has not been implemented yet") + def test_create_table_with_location_only(self): + pass + + @unittest.skip("create has not been implemented yet") + def test_create_table_with_name_and_location(self): + pass + + @unittest.skip("create has not been implemented yet") + def test_create_table_behavior(self): + pass + + @unittest.skip("replace has not been implemented yet") + def test_replace_table_with_name_only(self): + pass + + @unittest.skip("replace has not been implemented yet") + def test_replace_table_with_location_only(self): + pass + + @unittest.skip("replace has not been implemented yet") + def test_replace_table_with_name_and_location(self): + pass + + @unittest.skip("replace has not been implemented yet") + def test_replace_table_behavior(self): + pass + + @unittest.skip("create has not been implemented yet") + def test_verify_paritionedBy_compatibility(self): + pass + + @unittest.skip("create has not been implemented yet") + def test_create_table_with_identity_column(self): + pass + + @unittest.skip("create has not been implemented yet") + def test_delta_table_builder_with_bad_args(self): + pass + + @unittest.skip("upgradeToProtocol has not been implemented yet") + def test_protocolUpgrade(self): + pass + + @unittest.skip("addFeatureSupport has not been implemented yet") + def test_addFeatureSupport(self): + pass + + @unittest.skip("dropFeatureSupport has not been implemented yet") + def test_dropFeatureSupport(self): + pass + + @unittest.skip("restoreToVersion has not been implemented yet") + def test_restore_to_version(self): + pass + + @unittest.skip("restoreToTimestamp has not been implemented yet") + def test_restore_to_timestamp(self): + pass + + @unittest.skip("restore has not been implemented yet") + def test_restore_invalid_inputs(self): + pass + + @unittest.skip("optimize has not been implemented yet") + def test_optimize(self): + pass + + @unittest.skip("optimize has not been implemented yet") + def test_optimize_w_partition_filter(self): + pass + + @unittest.skip("optimize has not been implemented yet") + def test_optimize_zorder_by(self): + pass + + @unittest.skip("optimize has not been implemented yet") + def test_optimize_zorder_by_w_partition_filter(self): + pass + + +if __name__ == "__main__": + try: + import xmlrunner + testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=4) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=4) diff --git a/python/delta/tables.py b/python/delta/tables.py index 23933f928a0..8cadffd2642 100644 --- a/python/delta/tables.py +++ b/python/delta/tables.py @@ -381,6 +381,15 @@ def forPath( """ assert sparkSession is not None + if (sparkSession.version.split(".")[0] == "4"): + # Delta Connect is only supported in Delta 4.0+ on Spark 4.0+ + # Only import if using Delta/Spark 4.0+ since the import fails without grpc installed + from pyspark.sql.connect.session import SparkSession as RemoteSparkSession + + if isinstance(sparkSession, RemoteSparkSession): + from delta.connect.tables import DeltaTable as RemoteDeltaTable + return RemoteDeltaTable.forPath(sparkSession, path, hadoopConf) + jvm: "JVMView" = sparkSession._sc._jvm # type: ignore[attr-defined] jsparkSession: "JavaObject" = sparkSession._jsparkSession # type: ignore[attr-defined] @@ -413,6 +422,15 @@ def forName( """ assert sparkSession is not None + if (sparkSession.version.split(".")[0] == "4"): + # Delta Connect is only supported in Delta 4.0+ on Spark 4.0+ + # Only import if using Delta/Spark 4.0+ since the import fails without grpc installed + from pyspark.sql.connect.session import SparkSession as RemoteSparkSession + + if isinstance(sparkSession, RemoteSparkSession): + from delta.connect.tables import DeltaTable as RemoteDeltaTable + return RemoteDeltaTable.forName(sparkSession, tableOrViewName) + jvm: "JVMView" = sparkSession._sc._jvm # type: ignore[attr-defined] jsparkSession: "JavaObject" = sparkSession._jsparkSession # type: ignore[attr-defined] diff --git a/python/run-tests.py b/python/run-tests.py index 73f2f4c2f4a..ba119ddb3a9 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -39,7 +39,8 @@ def test(root_dir, code_dir, packages): "--driver-class-path=%s" % extra_class_path, "--repositories", ("https://maven-central.storage-download.googleapis.com/maven2/," - "https://repo1.maven.org/maven2/"), + "https://repo1.maven.org/maven2/," + "https://repository.apache.org/content/repositories/orgapachespark-1480"), "--packages", ",".join(packages), test_file] print("Running tests in %s\n=============" % test_file) print("Command: %s" % str(cmd)) @@ -201,3 +202,10 @@ def run_delta_connect_codegen_python(root_dir): # For versions 4.0+ run Delta Connect tests as well if use_spark_master: run_delta_connect_codegen_python(root_dir) + # TODO: In the future, find a way to get these + # packages locally instead of downloading from Maven. + delta_connect_packages = ["com.google.protobuf:protobuf-java:3.25.1", + "org.apache.spark:spark-connect_2.13:4.0.0", + get_local_package("delta-connect-server", use_spark_master)] + + test(root_dir, path.join("delta", "connect"), delta_connect_packages) diff --git a/run-integration-tests.py b/run-integration-tests.py index a2547d1c262..817c1f13300 100755 --- a/run-integration-tests.py +++ b/run-integration-tests.py @@ -76,7 +76,7 @@ def run_python_integration_tests(root_dir, version, test_name, extra_maven_repo, run_cmd(["build/sbt", "publishM2"]) test_dir = path.join(root_dir, path.join("examples", "python")) - files_to_skip = {"using_with_pip.py", "missing_delta_storage_jar.py", "image_storage.py"} + files_to_skip = {"using_with_pip.py", "missing_delta_storage_jar.py", "image_storage.py", "delta_connect.py"} test_files = [path.join(test_dir, f) for f in os.listdir(test_dir) if path.isfile(path.join(test_dir, f)) and