2727from google .api_core .exceptions import FailedPrecondition , InvalidArgument , NotFound
2828from google .cloud .dataproc_v1 .types import sessions
2929
30- from google .cloud .dataproc_spark_connect .client import DataprocChannelBuilder
30+ from google .cloud .spark_connect .client import DataprocChannelBuilder
3131from google .cloud .dataproc_v1 import (
3232 CreateSessionRequest ,
3333 GetSessionRequest ,
4747logger = logging .getLogger (__name__ )
4848
4949
50- class DataprocSparkSession (SparkSession ):
50+ class GoogleSparkSession (SparkSession ):
5151 """The entry point to programming Spark with the Dataset and DataFrame API.
5252
5353 A DataprocRemoteSparkSession can be used to create :class:`DataFrame`, register :class:`DataFrame` as
@@ -59,7 +59,7 @@ class DataprocSparkSession(SparkSession):
5959 Create a Spark session with Dataproc Spark Connect.
6060
6161 >>> spark = (
62- ... DataprocSparkSession .builder
62+ ... GoogleSparkSession .builder
6363 ... .appName("Word Count")
6464 ... .dataprocConfig(Session())
6565 ... .getOrCreate()
@@ -130,25 +130,25 @@ def dataprocConfig(self, dataproc_config: Session):
130130 def remote (self , url : Optional [str ] = None ) -> "SparkSession.Builder" :
131131 if url :
132132 raise NotImplemented (
133- "DataprocSparkSession does not support connecting to an existing remote server"
133+ "GoogleSparkSession does not support connecting to an existing remote server"
134134 )
135135 else :
136136 return self
137137
138138 def create (self ) -> "SparkSession" :
139139 raise NotImplemented (
140- "DataprocSparkSession allows session creation only through getOrCreate"
140+ "GoogleSparkSession allows session creation only through getOrCreate"
141141 )
142142
143143 def __create_spark_connect_session_from_s8s (
144144 self , session_response
145145 ) -> "SparkSession" :
146- DataprocSparkSession ._active_s8s_session_uuid = (
146+ GoogleSparkSession ._active_s8s_session_uuid = (
147147 session_response .uuid
148148 )
149- DataprocSparkSession ._project_id = self ._project_id
150- DataprocSparkSession ._region = self ._region
151- DataprocSparkSession ._client_options = self ._client_options
149+ GoogleSparkSession ._project_id = self ._project_id
150+ GoogleSparkSession ._region = self ._region
151+ GoogleSparkSession ._client_options = self ._client_options
152152 spark_connect_url = session_response .runtime_info .endpoints .get (
153153 "Spark Connect Server"
154154 )
@@ -158,9 +158,9 @@ def __create_spark_connect_session_from_s8s(
158158 self ._channel_builder = DataprocChannelBuilder (url )
159159
160160 assert self ._channel_builder is not None
161- session = DataprocSparkSession (connection = self ._channel_builder )
161+ session = GoogleSparkSession (connection = self ._channel_builder )
162162
163- DataprocSparkSession ._set_default_and_active_session (session )
163+ GoogleSparkSession ._set_default_and_active_session (session )
164164 self .__apply_options (session )
165165 return session
166166
@@ -169,7 +169,7 @@ def __create(self) -> "SparkSession":
169169
170170 if self ._options .get ("spark.remote" , False ):
171171 raise NotImplemented (
172- "DataprocSparkSession does not support connecting to an existing remote server"
172+ "GoogleSparkSession does not support connecting to an existing remote server"
173173 )
174174
175175 from google .cloud .dataproc_v1 import SessionControllerClient
@@ -202,7 +202,7 @@ def __create(self) -> "SparkSession":
202202 )
203203
204204 logger .debug ("Creating serverless session" )
205- DataprocSparkSession ._active_s8s_session_id = session_id
205+ GoogleSparkSession ._active_s8s_session_id = session_id
206206 s8s_creation_start_time = time .time ()
207207 try :
208208 session_polling = retry .Retry (
@@ -246,12 +246,12 @@ def __create(self) -> "SparkSession":
246246 f"Exception while writing active session to file { file_path } , { e } "
247247 )
248248 except InvalidArgument as e :
249- DataprocSparkSession ._active_s8s_session_id = None
249+ GoogleSparkSession ._active_s8s_session_id = None
250250 raise RuntimeError (
251251 f"Error while creating serverless session: { e } "
252252 ) from None
253253 except Exception as e :
254- DataprocSparkSession ._active_s8s_session_id = None
254+ GoogleSparkSession ._active_s8s_session_id = None
255255 raise RuntimeError (
256256 f"Error while creating serverless session https://console.cloud.google.com/dataproc/interactive/{ self ._region } /{ session_id } : { e } "
257257 ) from None
@@ -286,12 +286,12 @@ def _is_s8s_session_active(
286286 return None
287287
288288 def _get_exiting_active_session (self ) -> Optional ["SparkSession" ]:
289- s8s_session_id = DataprocSparkSession ._active_s8s_session_id
289+ s8s_session_id = GoogleSparkSession ._active_s8s_session_id
290290 session_response = self ._is_s8s_session_active (s8s_session_id )
291291
292- session = DataprocSparkSession .getActiveSession ()
292+ session = GoogleSparkSession .getActiveSession ()
293293 if session is None :
294- session = DataprocSparkSession ._default_session
294+ session = GoogleSparkSession ._default_session
295295
296296 if session_response is not None :
297297 print (
@@ -312,7 +312,7 @@ def _get_exiting_active_session(self) -> Optional["SparkSession"]:
312312 return None
313313
314314 def getOrCreate (self ) -> "SparkSession" :
315- with DataprocSparkSession ._lock :
315+ with GoogleSparkSession ._lock :
316316 session = self ._get_exiting_active_session ()
317317 if session is None :
318318 session = self .__create ()
@@ -413,7 +413,7 @@ def _get_and_validate_version(self, dataproc_config, session_template):
413413 "dataproc-spark-connect"
414414 )
415415 client_version = importlib .metadata .version ("pyspark" )
416- version_message = f"Dataproc Spark Connect: { dataproc_connect_version } (PySpark: { client_version } ) Dataproc Session Runtime: { version } (Spark: { server_version } )"
416+ version_message = f"Spark Connect: { dataproc_connect_version } (PySpark: { client_version } ) Session Runtime: { version } (Spark: { server_version } )"
417417 logger .info (version_message )
418418 if trimmed_version (client_version ) != trimmed_version (
419419 server_version
@@ -454,7 +454,7 @@ def _repr_html_(self) -> str:
454454 <div>
455455 <p><b>Spark Connect</b></p>
456456
457- <p><a href="{ s8s_session } ">Dataproc Session</a></p>
457+ <p><a href="{ s8s_session } ">Serverless Session</a></p>
458458 <p><a href="{ ui } ">Spark UI</a></p>
459459 </div>
460460 """
@@ -473,15 +473,15 @@ def _remove_stoped_session_from_file(self):
473473 )
474474
475475 def stop (self ) -> None :
476- with DataprocSparkSession ._lock :
477- if DataprocSparkSession ._active_s8s_session_id is not None :
476+ with GoogleSparkSession ._lock :
477+ if GoogleSparkSession ._active_s8s_session_id is not None :
478478 from google .cloud .dataproc_v1 import SessionControllerClient
479479
480480 logger .debug (
481- f"Terminating serverless session: { DataprocSparkSession ._active_s8s_session_id } "
481+ f"Terminating serverless session: { GoogleSparkSession ._active_s8s_session_id } "
482482 )
483483 terminate_session_request = TerminateSessionRequest ()
484- session_name = f"projects/{ DataprocSparkSession ._project_id } /locations/{ DataprocSparkSession ._region } /sessions/{ DataprocSparkSession ._active_s8s_session_id } "
484+ session_name = f"projects/{ GoogleSparkSession ._project_id } /locations/{ GoogleSparkSession ._region } /sessions/{ GoogleSparkSession ._active_s8s_session_id } "
485485 terminate_session_request .name = session_name
486486 state = None
487487 try :
@@ -503,26 +503,26 @@ def stop(self) -> None:
503503 sleep (1 )
504504 except NotFound :
505505 logger .debug (
506- f"Session { DataprocSparkSession ._active_s8s_session_id } already deleted"
506+ f"Session { GoogleSparkSession ._active_s8s_session_id } already deleted"
507507 )
508508 except FailedPrecondition :
509509 logger .debug (
510- f"Session { DataprocSparkSession ._active_s8s_session_id } already terminated manually or terminated automatically through session ttl limits"
510+ f"Session { GoogleSparkSession ._active_s8s_session_id } already terminated manually or terminated automatically through session ttl limits"
511511 )
512512 if state is not None and state == Session .State .FAILED :
513513 raise RuntimeError ("Serverless session termination failed" )
514514
515515 self ._remove_stoped_session_from_file ()
516- DataprocSparkSession ._active_s8s_session_uuid = None
517- DataprocSparkSession ._active_s8s_session_id = None
518- DataprocSparkSession ._project_id = None
519- DataprocSparkSession ._region = None
520- DataprocSparkSession ._client_options = None
516+ GoogleSparkSession ._active_s8s_session_uuid = None
517+ GoogleSparkSession ._active_s8s_session_id = None
518+ GoogleSparkSession ._project_id = None
519+ GoogleSparkSession ._region = None
520+ GoogleSparkSession ._client_options = None
521521
522522 self .client .close ()
523- if self is DataprocSparkSession ._default_session :
524- DataprocSparkSession ._default_session = None
523+ if self is GoogleSparkSession ._default_session :
524+ GoogleSparkSession ._default_session = None
525525 if self is getattr (
526- DataprocSparkSession ._active_session , "session" , None
526+ GoogleSparkSession ._active_session , "session" , None
527527 ):
528- DataprocSparkSession ._active_session .session = None
528+ GoogleSparkSession ._active_session .session = None
0 commit comments