Skip to content

Commit 35b101a

Browse files
authored
remove usage of ray.worker.global_worker.connected (#445)
1 parent 10d1c2d commit 35b101a

File tree

3 files changed

+12
-9
lines changed

3 files changed

+12
-9
lines changed

python/raydp/tests/test_data_owner_transfer.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import ray
88
from ray._private.client_mode_hook import client_mode_wrap
99
from ray.exceptions import RayTaskError, OwnerDiedError
10+
import ray.util.client as ray_client
1011
import raydp
1112
from raydp.spark import PartitionObjectsOwner
1213
from pyspark.sql import SparkSession
@@ -42,7 +43,7 @@ def test_fail_without_data_ownership_transfer(ray_cluster, jdk17_extra_spark_con
4243

4344
# skipping this to be compatible with ray 2.4.0
4445
# see issue #343
45-
if not ray.worker.global_worker.connected:
46+
if ray_client.ray.is_connected():
4647
pytest.skip("Skip this test if using ray client")
4748

4849
from raydp.spark.dataset import spark_dataframe_to_ray_dataset
@@ -90,7 +91,7 @@ def test_data_ownership_transfer(ray_cluster, jdk17_extra_spark_configs):
9091
This test should be able to execute till the end without crash as expected.
9192
"""
9293

93-
if not ray.worker.global_worker.connected:
94+
if ray_client.ray.is_connected():
9495
pytest.skip("Skip this test if using ray client")
9596

9697
from raydp.spark.dataset import spark_dataframe_to_ray_dataset
@@ -153,7 +154,7 @@ def wake(self):
153154
def set_objects(self, objects):
154155
self.objects = objects
155156

156-
if not ray.worker.global_worker.connected:
157+
if ray_client.ray.is_connected():
157158
pytest.skip("Skip this test if using ray client")
158159

159160
from raydp.spark.dataset import spark_dataframe_to_ray_dataset

python/raydp/tests/test_mpi.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import pytest
2121
import ray
2222
from ray.util import placement_group, remove_placement_group
23+
import ray.util.client as ray_client
2324

2425
from raydp.mpi import create_mpi_job, MPIJobContext, WorkerContext
2526

@@ -28,7 +29,7 @@
2829
def test_mpi_start(ray_cluster):
2930
if platform.system() == "Darwin":
3031
pytest.skip("Skip MPI test on MacOS")
31-
if not ray.worker.global_worker.connected:
32+
if ray_client.ray.is_connected():
3233
pytest.skip("Skip MPI test if using ray client")
3334
job = create_mpi_job(job_name="test",
3435
world_size=2,
@@ -61,7 +62,7 @@ def func(context: WorkerContext):
6162
def test_mpi_get_rank_address(ray_cluster):
6263
if platform.system() == "Darwin":
6364
pytest.skip("Skip MPI test on MacOS")
64-
if not ray.worker.global_worker.connected:
65+
if ray_client.ray.is_connected():
6566
pytest.skip("Skip MPI test if using ray client")
6667
with create_mpi_job(job_name="test",
6768
world_size=2,
@@ -79,7 +80,7 @@ def test_mpi_get_rank_address(ray_cluster):
7980
def test_mpi_with_script_prepare_fn(ray_cluster):
8081
if platform.system() == "Darwin":
8182
pytest.skip("Skip MPI test on MacOS")
82-
if not ray.worker.global_worker.connected:
83+
if ray_client.ray.is_connected():
8384
pytest.skip("Skip MPI test if using ray client")
8485
def script_prepare_fn(context: MPIJobContext):
8586
context.add_env("is_test", "True")
@@ -106,7 +107,7 @@ def f(context: WorkerContext):
106107
def test_mpi_with_pg(ray_cluster):
107108
if platform.system() == "Darwin":
108109
pytest.skip("Skip MPI test on MacOS")
109-
if not ray.worker.global_worker.connected:
110+
if ray_client.ray.is_connected():
110111
pytest.skip("Skip MPI test if using ray client")
111112
pg = placement_group(bundles=[{"CPU": 2}], strategy="STRICT_SPREAD")
112113
with create_mpi_job(job_name="test",

python/raydp/tests/test_spark_cluster.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import raydp.utils as utils
3232
from raydp.spark.ray_cluster_master import RayDPSparkMaster, RAYDP_SPARK_MASTER_SUFFIX
3333
from ray.cluster_utils import Cluster
34+
import ray.util.client as ray_client
3435

3536
def test_spark(spark_on_ray_small):
3637
spark = spark_on_ray_small
@@ -162,7 +163,7 @@ def test_ray_dataset_roundtrip(jdk17_extra_spark_configs):
162163

163164
# skipping this to be compatible with ray 2.4.0
164165
# see issue #343
165-
if not ray.worker.global_worker.connected:
166+
if ray_client.ray.is_connected():
166167
pytest.skip("Skip this test if using ray client")
167168
spark_df = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c")], ["one", "two"])
168169
rows = [(r.one, r.two) for r in spark_df.take(3)]
@@ -188,7 +189,7 @@ def test_ray_dataset_roundtrip(jdk17_extra_spark_configs):
188189
def test_ray_dataset_to_spark(spark_on_ray_2_executors):
189190
# skipping this to be compatible with ray 2.4.0
190191
# see issue #343
191-
if not ray.worker.global_worker.connected:
192+
if ray_client.ray.is_connected():
192193
pytest.skip("Skip this test if using ray client")
193194
spark = spark_on_ray_2_executors
194195
n = 5

0 commit comments

Comments
 (0)