Skip to content

Commit 313dba4

Browse files
authored
refactor!: rename package and class to google.cloud.spark_connect.GoogleSparkSession (#21)
BREAKING CHANGE: Rename package to google.cloud.spark_connect.GoogleSparkSession
1 parent 64d49b7 commit 313dba4

File tree

9 files changed

+93
-93
lines changed

9 files changed

+93
-93
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
*~
33
.*.sw[nop]
44
.idea
5+
.DS_Store
56
__pycache__
67
build/
78
dist/
9+

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,15 @@ If you are running the client outside of Google Cloud, you must set following en
3939

4040
.. code-block:: python
4141

42-
from google.cloud.dataproc_spark_connect import DataprocSparkSession
42+
from google.cloud.spark_connect import GoogleSparkSession
4343

4444
3. There are two ways to create a spark session,
4545

4646
1. Start a Spark session using properties defined in `DATAPROC_SPARK_CONNECT_SESSION_DEFAULT_CONFIG`:
4747

4848
.. code-block:: python
4949

50-
spark = DataprocSparkSession.builder.getOrCreate()
50+
spark = GoogleSparkSession.builder.getOrCreate()
5151

5252
2. Start a Spark session with the following code instead of using a config file:
5353

@@ -59,7 +59,7 @@ If you are running the client outside of Google Cloud, you must set following en
5959
dataproc_config.spark_connect_session = SparkConnectConfig()
6060
dataproc_config.environment_config.execution_config.subnetwork_uri = "<subnet>"
6161
dataproc_config.runtime_config.version = '3.0'
62-
spark = DataprocSparkSession.builder.dataprocConfig(dataproc_config).getOrCreate()
62+
spark = GoogleSparkSession.builder.dataprocConfig(dataproc_config).getOrCreate()
6363

6464
## Billing
6565
As this client runs the spark workload on Dataproc, your project will be billed as per [Dataproc Serverless Pricing](https://cloud.google.com/dataproc-serverless/pricing).

google/cloud/dataproc_spark_connect/__init__.py renamed to google/cloud/spark_connect/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
from .session import DataprocSparkSession
14+
from .session import GoogleSparkSession
File renamed without changes.
File renamed without changes.
File renamed without changes.

google/cloud/dataproc_spark_connect/session.py renamed to google/cloud/spark_connect/session.py

Lines changed: 36 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from google.api_core.exceptions import FailedPrecondition, InvalidArgument, NotFound
2828
from google.cloud.dataproc_v1.types import sessions
2929

30-
from google.cloud.dataproc_spark_connect.client import DataprocChannelBuilder
30+
from google.cloud.spark_connect.client import DataprocChannelBuilder
3131
from google.cloud.dataproc_v1 import (
3232
CreateSessionRequest,
3333
GetSessionRequest,
@@ -47,7 +47,7 @@
4747
logger = logging.getLogger(__name__)
4848

4949

50-
class DataprocSparkSession(SparkSession):
50+
class GoogleSparkSession(SparkSession):
5151
"""The entry point to programming Spark with the Dataset and DataFrame API.
5252
5353
A DataprocRemoteSparkSession can be used to create :class:`DataFrame`, register :class:`DataFrame` as
@@ -59,7 +59,7 @@ class DataprocSparkSession(SparkSession):
5959
Create a Spark session with Dataproc Spark Connect.
6060
6161
>>> spark = (
62-
... DataprocSparkSession.builder
62+
... GoogleSparkSession.builder
6363
... .appName("Word Count")
6464
... .dataprocConfig(Session())
6565
... .getOrCreate()
@@ -130,25 +130,23 @@ def dataprocConfig(self, dataproc_config: Session):
130130
def remote(self, url: Optional[str] = None) -> "SparkSession.Builder":
131131
if url:
132132
raise NotImplemented(
133-
"DataprocSparkSession does not support connecting to an existing remote server"
133+
"GoogleSparkSession does not support connecting to an existing remote server"
134134
)
135135
else:
136136
return self
137137

138138
def create(self) -> "SparkSession":
139139
raise NotImplemented(
140-
"DataprocSparkSession allows session creation only through getOrCreate"
140+
"GoogleSparkSession allows session creation only through getOrCreate"
141141
)
142142

143143
def __create_spark_connect_session_from_s8s(
144144
self, session_response
145145
) -> "SparkSession":
146-
DataprocSparkSession._active_s8s_session_uuid = (
147-
session_response.uuid
148-
)
149-
DataprocSparkSession._project_id = self._project_id
150-
DataprocSparkSession._region = self._region
151-
DataprocSparkSession._client_options = self._client_options
146+
GoogleSparkSession._active_s8s_session_uuid = session_response.uuid
147+
GoogleSparkSession._project_id = self._project_id
148+
GoogleSparkSession._region = self._region
149+
GoogleSparkSession._client_options = self._client_options
152150
spark_connect_url = session_response.runtime_info.endpoints.get(
153151
"Spark Connect Server"
154152
)
@@ -158,9 +156,9 @@ def __create_spark_connect_session_from_s8s(
158156
self._channel_builder = DataprocChannelBuilder(url)
159157

160158
assert self._channel_builder is not None
161-
session = DataprocSparkSession(connection=self._channel_builder)
159+
session = GoogleSparkSession(connection=self._channel_builder)
162160

163-
DataprocSparkSession._set_default_and_active_session(session)
161+
GoogleSparkSession._set_default_and_active_session(session)
164162
self.__apply_options(session)
165163
return session
166164

@@ -169,7 +167,7 @@ def __create(self) -> "SparkSession":
169167

170168
if self._options.get("spark.remote", False):
171169
raise NotImplemented(
172-
"DataprocSparkSession does not support connecting to an existing remote server"
170+
"GoogleSparkSession does not support connecting to an existing remote server"
173171
)
174172

175173
from google.cloud.dataproc_v1 import SessionControllerClient
@@ -202,7 +200,7 @@ def __create(self) -> "SparkSession":
202200
)
203201

204202
logger.debug("Creating serverless session")
205-
DataprocSparkSession._active_s8s_session_id = session_id
203+
GoogleSparkSession._active_s8s_session_id = session_id
206204
s8s_creation_start_time = time.time()
207205
try:
208206
session_polling = retry.Retry(
@@ -246,12 +244,12 @@ def __create(self) -> "SparkSession":
246244
f"Exception while writing active session to file {file_path} , {e}"
247245
)
248246
except InvalidArgument as e:
249-
DataprocSparkSession._active_s8s_session_id = None
247+
GoogleSparkSession._active_s8s_session_id = None
250248
raise RuntimeError(
251249
f"Error while creating serverless session: {e}"
252250
) from None
253251
except Exception as e:
254-
DataprocSparkSession._active_s8s_session_id = None
252+
GoogleSparkSession._active_s8s_session_id = None
255253
raise RuntimeError(
256254
f"Error while creating serverless session https://console.cloud.google.com/dataproc/interactive/{self._region}/{session_id} : {e}"
257255
) from None
@@ -286,12 +284,12 @@ def _is_s8s_session_active(
286284
return None
287285

288286
def _get_exiting_active_session(self) -> Optional["SparkSession"]:
289-
s8s_session_id = DataprocSparkSession._active_s8s_session_id
287+
s8s_session_id = GoogleSparkSession._active_s8s_session_id
290288
session_response = self._is_s8s_session_active(s8s_session_id)
291289

292-
session = DataprocSparkSession.getActiveSession()
290+
session = GoogleSparkSession.getActiveSession()
293291
if session is None:
294-
session = DataprocSparkSession._default_session
292+
session = GoogleSparkSession._default_session
295293

296294
if session_response is not None:
297295
print(
@@ -312,7 +310,7 @@ def _get_exiting_active_session(self) -> Optional["SparkSession"]:
312310
return None
313311

314312
def getOrCreate(self) -> "SparkSession":
315-
with DataprocSparkSession._lock:
313+
with GoogleSparkSession._lock:
316314
session = self._get_exiting_active_session()
317315
if session is None:
318316
session = self.__create()
@@ -413,7 +411,7 @@ def _get_and_validate_version(self, dataproc_config, session_template):
413411
"dataproc-spark-connect"
414412
)
415413
client_version = importlib.metadata.version("pyspark")
416-
version_message = f"Dataproc Spark Connect: {dataproc_connect_version} (PySpark: {client_version}) Dataproc Session Runtime: {version} (Spark: {server_version})"
414+
version_message = f"Spark Connect: {dataproc_connect_version} (PySpark: {client_version}) Session Runtime: {version} (Spark: {server_version})"
417415
logger.info(version_message)
418416
if trimmed_version(client_version) != trimmed_version(
419417
server_version
@@ -454,7 +452,7 @@ def _repr_html_(self) -> str:
454452
<div>
455453
<p><b>Spark Connect</b></p>
456454
457-
<p><a href="{s8s_session}">Dataproc Session</a></p>
455+
<p><a href="{s8s_session}">Serverless Session</a></p>
458456
<p><a href="{ui}">Spark UI</a></p>
459457
</div>
460458
"""
@@ -473,15 +471,15 @@ def _remove_stoped_session_from_file(self):
473471
)
474472

475473
def stop(self) -> None:
476-
with DataprocSparkSession._lock:
477-
if DataprocSparkSession._active_s8s_session_id is not None:
474+
with GoogleSparkSession._lock:
475+
if GoogleSparkSession._active_s8s_session_id is not None:
478476
from google.cloud.dataproc_v1 import SessionControllerClient
479477

480478
logger.debug(
481-
f"Terminating serverless session: {DataprocSparkSession._active_s8s_session_id}"
479+
f"Terminating serverless session: {GoogleSparkSession._active_s8s_session_id}"
482480
)
483481
terminate_session_request = TerminateSessionRequest()
484-
session_name = f"projects/{DataprocSparkSession._project_id}/locations/{DataprocSparkSession._region}/sessions/{DataprocSparkSession._active_s8s_session_id}"
482+
session_name = f"projects/{GoogleSparkSession._project_id}/locations/{GoogleSparkSession._region}/sessions/{GoogleSparkSession._active_s8s_session_id}"
485483
terminate_session_request.name = session_name
486484
state = None
487485
try:
@@ -503,26 +501,26 @@ def stop(self) -> None:
503501
sleep(1)
504502
except NotFound:
505503
logger.debug(
506-
f"Session {DataprocSparkSession._active_s8s_session_id} already deleted"
504+
f"Session {GoogleSparkSession._active_s8s_session_id} already deleted"
507505
)
508506
except FailedPrecondition:
509507
logger.debug(
510-
f"Session {DataprocSparkSession._active_s8s_session_id} already terminated manually or terminated automatically through session ttl limits"
508+
f"Session {GoogleSparkSession._active_s8s_session_id} already terminated manually or terminated automatically through session ttl limits"
511509
)
512510
if state is not None and state == Session.State.FAILED:
513511
raise RuntimeError("Serverless session termination failed")
514512

515513
self._remove_stoped_session_from_file()
516-
DataprocSparkSession._active_s8s_session_uuid = None
517-
DataprocSparkSession._active_s8s_session_id = None
518-
DataprocSparkSession._project_id = None
519-
DataprocSparkSession._region = None
520-
DataprocSparkSession._client_options = None
514+
GoogleSparkSession._active_s8s_session_uuid = None
515+
GoogleSparkSession._active_s8s_session_id = None
516+
GoogleSparkSession._project_id = None
517+
GoogleSparkSession._region = None
518+
GoogleSparkSession._client_options = None
521519

522520
self.client.close()
523-
if self is DataprocSparkSession._default_session:
524-
DataprocSparkSession._default_session = None
521+
if self is GoogleSparkSession._default_session:
522+
GoogleSparkSession._default_session = None
525523
if self is getattr(
526-
DataprocSparkSession._active_session, "session", None
524+
GoogleSparkSession._active_session, "session", None
527525
):
528-
DataprocSparkSession._active_session.session = None
526+
GoogleSparkSession._active_session.session = None

0 commit comments

Comments
 (0)