|
| 1 | +import sys |
| 2 | +import pytest |
| 3 | +import ray |
| 4 | +import raydp |
| 5 | +from ray.cluster_utils import Cluster |
| 6 | +from ray.util.state import list_actors |
| 7 | + |
| 8 | + |
| 9 | +def test_spark_master_memory_custom(jdk17_extra_spark_configs): |
| 10 | + cluster = Cluster( |
| 11 | + initialize_head=True, |
| 12 | + head_node_args={ |
| 13 | + "num_cpus": 2, |
| 14 | + "resources": {"master": 10}, |
| 15 | + "include_dashboard": True, |
| 16 | + "dashboard_port": 8270, |
| 17 | + }, |
| 18 | + ) |
| 19 | + ray.init(address=cluster.address, |
| 20 | + dashboard_port=cluster.head_node.dashboard_grpc_port, |
| 21 | + include_dashboard=True) |
| 22 | + |
| 23 | + custom_memory = 100 * 1024 * 1024 # 100MB in bytes |
| 24 | + configs = jdk17_extra_spark_configs.copy() |
| 25 | + # Config under test: set Spark Master actor memory via RayDP config |
| 26 | + configs["spark.ray.raydp_spark_master.actor.resource.memory"] = str(custom_memory) |
| 27 | + # Also require the master custom resource so the actor is scheduled on the head |
| 28 | + configs["spark.ray.raydp_spark_master.actor.resource.master"] = "1" |
| 29 | + |
| 30 | + app_name = "test_spark_master_memory_custom" |
| 31 | + |
| 32 | + spark = raydp.init_spark( |
| 33 | + app_name=app_name, |
| 34 | + num_executors=1, |
| 35 | + executor_cores=1, |
| 36 | + executor_memory="500M", |
| 37 | + configs=configs, |
| 38 | + ) |
| 39 | + |
| 40 | + # Trigger the Spark master / RayDPSparkMaster startup |
| 41 | + spark.createDataFrame([(1, 2)], ["a", "b"]).count() |
| 42 | + |
| 43 | + # RayDPSparkMaster name is app_name + RAYDP_SPARK_MASTER_SUFFIX |
| 44 | + master_actor_name = f"{app_name}_SPARK_MASTER" |
| 45 | + |
| 46 | + actor = ray.get_actor(master_actor_name) |
| 47 | + assert actor is not None |
| 48 | + |
| 49 | + # Query Ray state for this actor |
| 50 | + actor_state = list_actors(filters=[("actor_id", "=", actor._actor_id.hex())], detail=True)[0] |
| 51 | + resources = actor_state.required_resources |
| 52 | + |
| 53 | + assert resources["memory"] == custom_memory |
| 54 | + assert resources["master"] == 1 |
| 55 | + |
| 56 | + spark.stop() |
| 57 | + raydp.stop_spark() |
| 58 | + ray.shutdown() |
| 59 | + cluster.shutdown() |
| 60 | + |
| 61 | + |
| 62 | +if __name__ == "__main__": |
| 63 | + sys.exit(pytest.main(["-v", __file__])) |
| 64 | + |
| 65 | + |
0 commit comments