Skip to content

Commit a6e07bc

Browse files
Try to cherry-pick delta connect python changes
Copied from #3535
1 parent b976a38 commit a6e07bc

14 files changed

+558
-1
lines changed

dev/delta-connect-gen-protos.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ for f in `find gen/proto/python/delta/connect -name "*.py*"`; do
7373
-e 's/import spark.connect./import pyspark.sql.connect.proto./g' \
7474
-e 's/from delta.connect import/from delta.connect.proto import/g' \
7575
-e "s/DESCRIPTOR, 'delta.connect/DESCRIPTOR, 'delta.connect.proto/g" \
76+
-e 's/from delta.connect import/from delta.connect.proto import/g' \
7677
$f > $f.tmp
7778
mv $f.tmp $f
7879
elif [[ $f == *.pyi ]]; then

dev/requirements.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,10 @@ flake8==3.9.0
55
# Code Formatter
66
black==23.9.1
77

8+
# Spark Connect (required)
9+
grpcio>=1.62.0
10+
grpcio-status>=1.62.0
11+
googleapis-common-protos>=1.56.4
12+
813
# Spark and Delta Connect python proto generation plugin (optional)
914
mypy-protobuf==3.3.0

examples/python/delta_connect.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
#
2+
# Copyright (2024) The Delta Lake Project Authors.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
"""
18+
To run this example you must follow these steps:
19+
20+
Requirements:
21+
- Using Java 17
22+
- Spark 4.0.0-preview1+
23+
- delta-spark (python package) 4.0.0rc1+ and pyspark 4.0.0.dev1+
24+
25+
(1) Start a local Spark connect server using this command:
26+
sbin/start-connect-server.sh \
27+
--packages org.apache.spark:spark-connect_2.13:4.0.0-preview1,io.delta:delta-connect-server_2.13:{DELTA_VERSION},io.delta:delta-spark_2.13:{DELTA_VERSION},com.google.protobuf:protobuf-java:3.25.1 \
28+
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
29+
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
30+
--conf "spark.connect.extensions.relation.classes"="org.apache.spark.sql.connect.delta.DeltaRelationPlugin" \
31+
--conf "spark.connect.extensions.command.classes"="org.apache.spark.sql.connect.delta.DeltaCommandPlugin"
32+
* Be sure to replace DELTA_VERSION with the version you are using
33+
34+
(2) Set the SPARK_REMOTE environment variable to point to your local Spark server
35+
export SPARK_REMOTE="sc://localhost:15002"
36+
37+
(3) Run this file i.e. python3 examples/python/delta_connect.py
38+
"""
39+
40+
import os
41+
from pyspark.sql import SparkSession
42+
from delta.tables import DeltaTable
43+
import shutil
44+
45+
filePath = "/tmp/delta_connect"
46+
tableName = "delta_connect_table"
47+
48+
def assert_dataframe_equals(df1, df2):
49+
assert(df1.collect().sort() == df2.collect().sort())
50+
51+
def cleanup(spark):
52+
shutil.rmtree(filePath, ignore_errors=True)
53+
spark.sql(f"DROP TABLE IF EXISTS {tableName}")
54+
55+
# --------------------- Set up Spark Connect spark session ------------------------
56+
57+
assert os.getenv("SPARK_REMOTE"), "Must point to Spark Connect server using SPARK_REMOTE"
58+
59+
spark = SparkSession.builder \
60+
.appName("delta_connect") \
61+
.remote(os.getenv("SPARK_REMOTE")) \
62+
.getOrCreate()
63+
64+
# Clean up any previous runs
65+
cleanup(spark)
66+
67+
# -------------- Try reading non-existent table (should fail with an exception) ----------------
68+
69+
# Using forPath
70+
try:
71+
DeltaTable.forPath(spark, filePath).toDF().show()
72+
except Exception as e:
73+
assert "DELTA_MISSING_DELTA_TABLE" in str(e)
74+
else:
75+
assert False, "Expected exception to be thrown for missing table"
76+
77+
# Using forName
78+
try:
79+
DeltaTable.forName(spark, tableName).toDF().show()
80+
except Exception as e:
81+
assert "DELTA_MISSING_DELTA_TABLE" in str(e)
82+
else:
83+
assert False, "Expected exception to be thrown for missing table"
84+
85+
# ------------------------ Write basic table and check that results match ----------------------
86+
87+
# By table name
88+
spark.range(5).write.format("delta").saveAsTable(tableName)
89+
assert_dataframe_equals(DeltaTable.forName(spark, tableName).toDF(), spark.range(5))
90+
assert_dataframe_equals(spark.read.format("delta").table(tableName), spark.range(5))
91+
assert_dataframe_equals(spark.sql(f"SELECT * FROM {tableName}"), spark.range(5))
92+
93+
# By table path
94+
spark.range(10).write.format("delta").save(filePath)
95+
assert_dataframe_equals(DeltaTable.forPath(spark, filePath).toDF(), spark.range(10))
96+
assert_dataframe_equals(spark.read.format("delta").load(filePath), spark.range(10))
97+
assert_dataframe_equals(spark.sql(f"SELECT * FROM delta.`{filePath}`"), spark.range(10))
98+
99+
# ---------------------------------- Clean up ----------------------------------------
100+
cleanup(spark)

python/delta/connect/__init__.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#
2+
# Copyright (2024) The Delta Lake Project Authors.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
from delta.connect.tables import DeltaTable
18+
19+
__all__ = ['DeltaTable']

python/delta/connect/plan.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#
2+
# Copyright (2024) The Delta Lake Project Authors.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
from typing import Optional
18+
19+
import delta.connect.proto as proto
20+
21+
from pyspark.sql.connect.client import SparkConnectClient
22+
from pyspark.sql.connect.plan import LogicalPlan
23+
import pyspark.sql.connect.proto as spark_proto
24+
25+
26+
class DeltaLogicalPlan(LogicalPlan):
27+
def __init__(self, child: Optional[LogicalPlan]) -> None:
28+
super().__init__(child)
29+
30+
def plan(self, session: SparkConnectClient) -> spark_proto.Relation:
31+
plan = spark_proto.Relation()
32+
plan.extension.Pack(self.to_delta_relation(session))
33+
return plan
34+
35+
def to_delta_relation(self, session: SparkConnectClient) -> proto.DeltaRelation:
36+
...
37+
38+
def command(self, session: SparkConnectClient) -> spark_proto.Command:
39+
command = spark_proto.Command()
40+
command.extension.Pack(self.to_delta_command(session))
41+
return command
42+
43+
def to_delta_command(self, session: SparkConnectClient) -> proto.DeltaCommand:
44+
...
45+
46+
47+
class DeltaScan(DeltaLogicalPlan):
48+
def __init__(self, table: proto.DeltaTable) -> None:
49+
super().__init__(None)
50+
self._table = table
51+
52+
def to_delta_relation(self, client: SparkConnectClient) -> proto.DeltaRelation:
53+
relation = proto.DeltaRelation()
54+
relation.scan.table.CopyFrom(self._table)
55+
return relation
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#
2+
# Copyright (2024) The Delta Lake Project Authors.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
from delta.connect.proto.base_pb2 import *
18+
from delta.connect.proto.commands_pb2 import *
19+
from delta.connect.proto.relations_pb2 import *

python/delta/connect/tables.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
#
2+
# Copyright (2024) The Delta Lake Project Authors.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
from typing import Dict, Optional
18+
19+
from delta.connect.plan import DeltaScan
20+
import delta.connect.proto as proto
21+
from delta.tables import DeltaTable as LocalDeltaTable
22+
23+
from pyspark.sql.connect.dataframe import DataFrame
24+
from pyspark.sql.connect.plan import LogicalPlan, SubqueryAlias
25+
from pyspark.sql.connect.session import SparkSession
26+
27+
28+
class DeltaTable(object):
29+
def __init__(
30+
self,
31+
spark: SparkSession,
32+
path: Optional[str] = None,
33+
tableOrViewName: Optional[str] = None,
34+
hadoopConf: Dict[str, str] = dict(),
35+
plan: Optional[LogicalPlan] = None
36+
) -> None:
37+
self._spark = spark
38+
self._path = path
39+
self._tableOrViewName = tableOrViewName
40+
self._hadoopConf = hadoopConf
41+
if plan is not None:
42+
self._plan = plan
43+
else:
44+
self._plan = DeltaScan(self._to_proto())
45+
46+
def toDF(self) -> DataFrame:
47+
return DataFrame(self._plan, session=self._spark)
48+
49+
def alias(self, aliasName: str) -> "DeltaTable":
50+
return DeltaTable(
51+
self._spark,
52+
self._path,
53+
self._tableOrViewName,
54+
self._hadoopConf,
55+
SubqueryAlias(self._plan, aliasName)
56+
)
57+
58+
@classmethod
59+
def forPath(
60+
cls,
61+
sparkSession: SparkSession,
62+
path: str,
63+
hadoopConf: Dict[str, str] = dict()
64+
) -> "DeltaTable":
65+
assert sparkSession is not None
66+
return DeltaTable(sparkSession, path=path, hadoopConf=hadoopConf)
67+
68+
@classmethod
69+
def forName(
70+
cls, sparkSession: SparkSession, tableOrViewName: str
71+
) -> "DeltaTable":
72+
assert sparkSession is not None
73+
return DeltaTable(sparkSession, tableOrViewName=tableOrViewName)
74+
75+
def _to_proto(self) -> proto.DeltaTable:
76+
result = proto.DeltaTable()
77+
if self._path is not None:
78+
result.path.path = self._path
79+
if self._tableOrViewName is not None:
80+
result.table_or_view_name = self._tableOrViewName
81+
return result
82+
83+
84+
DeltaTable.__doc__ = LocalDeltaTable.__doc__
85+
DeltaTable.toDF.__doc__ = LocalDeltaTable.toDF.__doc__
86+
DeltaTable.alias.__doc__ = LocalDeltaTable.alias.__doc__
87+
DeltaTable.forPath.__func__.__doc__ = LocalDeltaTable.forPath.__doc__
88+
DeltaTable.forName.__func__.__doc__ = LocalDeltaTable.forName.__doc__
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#
2+
# Copyright (2024) The Delta Lake Project Authors.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#

python/delta/connect/testing/utils.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#
2+
# Copyright (2024) The Delta Lake Project Authors.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
import tempfile
18+
import shutil
19+
import os
20+
21+
from pyspark import SparkConf
22+
from pyspark.testing.connectutils import ReusedConnectTestCase
23+
24+
25+
class DeltaTestCase(ReusedConnectTestCase):
26+
"""
27+
Test suite base for setting up a properly configured SparkSession for using Delta Connect.
28+
"""
29+
30+
@classmethod
31+
def conf(cls) -> SparkConf:
32+
_conf = super(DeltaTestCase, cls).conf()
33+
_conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
34+
_conf.set("spark.sql.catalog.spark_catalog",
35+
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
36+
_conf.set("spark.connect.extensions.relation.classes",
37+
"org.apache.spark.sql.connect.delta.DeltaRelationPlugin")
38+
_conf.set("spark.connect.extensions.command.classes",
39+
"org.apache.spark.sql.connect.delta.DeltaCommandPlugin")
40+
return _conf
41+
42+
def setUp(self) -> None:
43+
super(DeltaTestCase, self).setUp()
44+
self.tempPath = tempfile.mkdtemp()
45+
self.tempFile = os.path.join(self.tempPath, "tempFile")
46+
47+
def tearDown(self) -> None:
48+
super(DeltaTestCase, self).tearDown()
49+
shutil.rmtree(self.tempPath)
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#
2+
# Copyright (2024) The Delta Lake Project Authors.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#

0 commit comments

Comments
 (0)