diff --git a/runhouse/cli_utils.py b/runhouse/cli_utils.py index b4f75941b..829e68bf7 100644 --- a/runhouse/cli_utils.py +++ b/runhouse/cli_utils.py @@ -226,18 +226,19 @@ def print_envs_info(servlet_processes: Dict[str, Dict[str, Any]], current_cluste envs_in_cluster_headline = "Serving 🍦 :" console.print(envs_in_cluster_headline) - env_resource_mapping = { - env: servlet_processes[env]["env_resource_mapping"] for env in servlet_processes + process_resource_mapping = { + env: servlet_processes[env]["process_resource_mapping"] + for env in servlet_processes } - if len(env_resource_mapping) == 0: + if len(process_resource_mapping) == 0: console.print("This cluster has no environment nor resources.") first_envs_to_print = [] # First: if the default env does not have resources, print it. default_process_name = DEFAULT_PROCESS_NAME - if len(env_resource_mapping[default_process_name]) <= 1: + if len(process_resource_mapping[default_process_name]) <= 1: # case where the default env doesn't hve any other resources, apart from the default env itself. console.print(f"{BULLET_UNICODE} {default_process_name}") console.print( @@ -253,11 +254,11 @@ def print_envs_info(servlet_processes: Dict[str, Dict[str, Any]], current_cluste # (the only resource they have is a runhouse.env, which is the env itself). first_envs_to_print = first_envs_to_print + [ env_name - for env_name in env_resource_mapping + for env_name in process_resource_mapping if ( - len(env_resource_mapping[env_name]) <= 1 + len(process_resource_mapping[env_name]) <= 1 and env_name != default_process_name - and env_resource_mapping[env_name] + and process_resource_mapping[env_name] ) ] @@ -267,19 +268,19 @@ def print_envs_info(servlet_processes: Dict[str, Dict[str, Any]], current_cluste # * Else, we will print the resources (rh.function, th.module) associated with the env. envs_to_print = first_envs_to_print + [ env_name - for env_name in env_resource_mapping + for env_name in process_resource_mapping if env_name not in first_envs_to_print + [default_process_name] ] for env_name in envs_to_print: - resources_in_env = env_resource_mapping[env_name] + resources_in_env = process_resource_mapping[env_name] env_process_info = servlet_processes[env_name] env_name_txt = f"{BULLET_UNICODE} {env_name} | pid: {env_process_info['pid']} | node: {env_process_info['node_name']}" console.print(env_name_txt) # Print CPU info - env_cpu_info = env_process_info.get("env_cpu_usage") + env_cpu_info = env_process_info.get("process_cpu_usage") if env_cpu_info: # convert bytes to GB @@ -306,7 +307,7 @@ def print_envs_info(servlet_processes: Dict[str, Dict[str, Any]], current_cluste console.print(cpu_usage_summary) # Print GPU info - env_gpu_info = env_process_info.get("env_gpu_usage") + env_gpu_info = env_process_info.get("process_gpu_usage") # sometimes the cluster has no GPU, therefore the env_gpu_info is an empty dictionary. if env_gpu_info: @@ -436,7 +437,7 @@ def print_status(status_data: dict, current_cluster) -> None: from runhouse.main import console cluster_config = status_data.get("cluster_config") - servlet_processes = status_data.get("env_servlet_processes") + servlet_processes = status_data.get("servlet_processes") cluster_name = cluster_config.get("name", None) if cluster_name: diff --git a/runhouse/resources/functions/function.py b/runhouse/resources/functions/function.py index 356a89809..07836de76 100644 --- a/runhouse/resources/functions/function.py +++ b/runhouse/resources/functions/function.py @@ -69,11 +69,11 @@ def to( name: Optional[str] = None, ): """ - Send the function to the specified env on the cluster. This will sync over relevant code and packages + Send the function to the specified process on the cluster. This will sync over relevant code and packages onto the cluster, and set up the environment if it does not yet exist on the cluster. Args: - system (str or Cluster): The system to setup the function and env on. + system (str or Cluster): The system to setup the function and process on. process (str or Dict, optional): The process to run the module on, if it's a Dict, it will be explicitly created with those args. or the set of requirements necessary to run the module. (Default: ``None``) name (Optional[str], optional): Name to give to the function resource, if you wish to rename it. diff --git a/runhouse/resources/functions/function_factory.py b/runhouse/resources/functions/function_factory.py index e43c1f078..5badd7ad5 100644 --- a/runhouse/resources/functions/function_factory.py +++ b/runhouse/resources/functions/function_factory.py @@ -17,7 +17,7 @@ def function( dryrun: bool = False, serialize_notebook_fn: bool = False, ): - """runhouse.function(fn: str | Callable | None = None, name: str | None = None, system: str | Cluster | None = None, env: str | List[str] | Env | None = None, dryrun: bool = False, load_secrets: bool = False, serialize_notebook_fn: bool = False) + """runhouse.function(fn: str | Callable | None = None, name: str | None = None, system: str | Cluster | None = None, dryrun: bool = False, serialize_notebook_fn: bool = False) Builds an instance of :class:`Function`. diff --git a/runhouse/resources/hardware/cluster.py b/runhouse/resources/hardware/cluster.py index 23f201ef7..f6abea157 100644 --- a/runhouse/resources/hardware/cluster.py +++ b/runhouse/resources/hardware/cluster.py @@ -608,7 +608,7 @@ def _sync_image_to_cluster(self): env_vars = {} log_level = os.getenv("RH_LOG_LEVEL") if log_level: - # add log level to the default env to ensure it gets set on the cluster when the server is restarted + # add log level to the default image to ensure it gets set on the cluster when the server is restarted env_vars["RH_LOG_LEVEL"] = log_level logger.info(f"Using log level {log_level} on cluster's default env") @@ -813,19 +813,19 @@ def put_resource( process (str, optional): Process of the object store to put the object in. (Default: ``None``) """ # Logic to get env_name from different ways env can be provided - env_name = process or ( + process = process or ( resource.process if hasattr(resource, "process") else DEFAULT_PROCESS_NAME ) state = state or {} if self.on_this_cluster(): data = (resource.config(condensed=False), state, dryrun) - return obj_store.put_resource(serialized_data=data, process=env_name) + return obj_store.put_resource(serialized_data=data, process=process) return self.call_client_method( "put_resource", resource, state=state or {}, - process=env_name, + process=process, dryrun=dryrun, ) @@ -987,7 +987,7 @@ def status(self, send_to_den: bool = False): elif den_resp_status_code != 200: logger.warning("Failed to send cluster status to Den") - if not configs.observability_enabled and status.get("env_servlet_processes"): + if not configs.observability_enabled and status.get("servlet_processes"): logger.warning( "Cluster observability is disabled. Metrics are stale and will " "no longer be collected. To re-enable observability, please " diff --git a/runhouse/rns/rns_client.py b/runhouse/rns/rns_client.py index 5189546c6..37748a19d 100644 --- a/runhouse/rns/rns_client.py +++ b/runhouse/rns/rns_client.py @@ -32,7 +32,7 @@ class ResourceStatusData(BaseModel): server_gpu_utilization: Optional[float] server_memory_usage: Dict[str, Any] server_gpu_usage: Optional[Dict[str, Any]] - env_servlet_processes: Dict[str, Dict[str, Any]] + servlet_processes: Dict[str, Dict[str, Any]] server_pid: int runhouse_version: str diff --git a/runhouse/servers/cluster_servlet.py b/runhouse/servers/cluster_servlet.py index 93e98452a..cb3a56713 100644 --- a/runhouse/servers/cluster_servlet.py +++ b/runhouse/servers/cluster_servlet.py @@ -256,7 +256,7 @@ async def aget_servlet_name_for_key(self, key: Any) -> str: async def aput_servlet_name_for_key(self, key: Any, servlet_name: str): if not await self.ais_servlet_name_initialized(servlet_name): raise ValueError( - f"Env servlet name {servlet_name} not initialized, and you tried to mark a resource as in it." + f"Servlet name {servlet_name} not initialized, and you tried to mark a resource as in it." ) self._key_to_servlet_name[key] = servlet_name @@ -268,16 +268,16 @@ async def aclear_key_to_servlet_name_dict(self): self._key_to_servlet_name = {} ############################################## - # Remove Env Servlet + # Remove Servlet ############################################## async def aclear_all_references_to_servlet_name(self, servlet_name: str): - # using lock to prevent status thread access self._initialized_servlet_args before the env is deleted. + # using lock to prevent status thread access self._initialized_servlet_args before the process is deleted. with self.lock: del self._initialized_servlet_args[servlet_name] deleted_keys = [ key - for key, env in self._key_to_servlet_name.items() - if env == servlet_name + for key, process in self._key_to_servlet_name.items() + if process == servlet_name ] for key in deleted_keys: self._key_to_servlet_name.pop(key) @@ -292,7 +292,7 @@ async def asave_status_metrics_to_den(self, status: dict): # making a copy so the status won't be modified with pop, since it will be returned after sending to den. # (status is passed as pointer). status_copy = copy.deepcopy(status) - servlet_processes = status_copy.pop("env_servlet_processes") + servlet_processes = status_copy.pop("servlet_processes") status_data = { "daemon_status": RunhouseDaemonStatus.RUNNING, @@ -300,7 +300,7 @@ async def asave_status_metrics_to_den(self, status: dict): "resource_type", "cluster" ), "resource_info": status_copy, - "env_servlet_processes": servlet_processes, + "servlet_processes": servlet_processes, } client = httpx.AsyncClient() @@ -486,14 +486,14 @@ async def _update_autostop(self, status: dict): function_running = any( any( len( - resource["env_resource_mapping"][resource_name].get( + resource["process_resource_mapping"][resource_name].get( "active_function_calls", [] ) ) > 0 - for resource_name in resource["env_resource_mapping"].keys() + for resource_name in resource["process_resource_mapping"].keys() ) - for resource in status.get("env_servlet_processes", {}).values() + for resource in status.get("servlet_processes", {}).values() ) if function_running: await self.autostop_helper.set_last_active_time_to_now() @@ -520,7 +520,7 @@ async def _status_for_servlet(self, servlet_name): } # Need to catch the exception here because we're running this in a gather, - # and need to know which env servlet failed + # and need to know which servlet failed except Exception as e: return {"servlet_name": servlet_name, "Exception": e} @@ -600,7 +600,7 @@ async def astatus(self, send_to_den: bool = False) -> Tuple[Dict, Optional[int]] # Popping out creds because we don't want to show them in the status config_cluster.pop("creds", None) - # Getting data from each env servlet about the objects it contains and the utilization data + # Getting data from each servlet about the objects it contains and the utilization data servlet_utilization_data = {} with self.lock: servlets_status = await asyncio.gather( @@ -610,7 +610,7 @@ async def astatus(self, send_to_den: bool = False) -> Tuple[Dict, Optional[int]] ], ) - # Store the data for the appropriate env servlet name + # Store the data for the appropriate servlet name for env_status in servlets_status: servlet_name = env_status.get("servlet_name") @@ -618,14 +618,14 @@ async def astatus(self, send_to_den: bool = False) -> Tuple[Dict, Optional[int]] if "Exception" in env_status.keys(): e = env_status.get("Exception") logger.warning( - f"Exception {str(e)} in status for env servlet {servlet_name}" + f"Exception {str(e)} in status for servlet {servlet_name}" ) servlet_utilization_data[servlet_name] = {} else: - # Store what was in the env and the utilization data + # Store what was in the process and the utilization data env_memory_info = env_status.get("servlet_utilization_data") - env_memory_info["env_resource_mapping"] = env_status.get( + env_memory_info["process_resource_mapping"] = env_status.get( "objects_in_servlet" ) servlet_utilization_data[servlet_name] = env_memory_info @@ -672,7 +672,7 @@ async def astatus(self, send_to_den: bool = False) -> Tuple[Dict, Optional[int]] "cluster_config": config_cluster, "runhouse_version": runhouse.__version__, "server_pid": self.pid, - "env_servlet_processes": servlet_utilization_data, + "servlet_processes": servlet_utilization_data, "server_cpu_utilization": cpu_utilization, "server_gpu_utilization": gpu_utilization, "server_memory_usage": memory_usage, diff --git a/runhouse/servers/http/http_server.py b/runhouse/servers/http/http_server.py index 4b1013ec5..1322be8f8 100644 --- a/runhouse/servers/http/http_server.py +++ b/runhouse/servers/http/http_server.py @@ -163,7 +163,7 @@ async def ainitialize( ), ) - # We initialize a default env servlet where some things may run. + # We initialize a default servlet where some things may run. _ = obj_store.get_servlet( name=DEFAULT_PROCESS_NAME, create_process_params=CreateProcessParams( diff --git a/runhouse/servers/servlet.py b/runhouse/servers/servlet.py index 9ea97d759..40e3152db 100644 --- a/runhouse/servers/servlet.py +++ b/runhouse/servers/servlet.py @@ -107,8 +107,7 @@ async def __init__( # Ray defaults to setting OMP_NUM_THREADS to 1, which unexpectedly limit parallelism in user programs. # We delete it by default, but if we find that the user explicitly set it to another value, we respect that. # This is really only a factor if the user set the value inside the VM or container, or inside the base_env - # which a cluster was initialized with. If they set it inside the env constructor and the env was sent to the - # cluster normally with .to, it will be set after this point. + # which a cluster was initialized with. # TODO this had no effect when we did it below where we set CUDA_VISIBLE_DEVICES, so we may need to move that # here and mirror the same behavior (setting it based on the detected gpus in the whole cluster may not work # for multinode, but popping it may also break things, it needs to be tested). @@ -132,7 +131,7 @@ async def __init__( if is_gpu_cluster(): logger.debug("Creating _periodic_gpu_check thread.") collect_gpu_thread = threading.Thread( - target=self._collect_env_gpu_usage, daemon=True + target=self._collect_process_gpu_usage, daemon=True ) collect_gpu_thread.start() @@ -250,7 +249,7 @@ async def adelete_local(self, key: Any): async def aclear_local(self): return await obj_store.aclear_local() - def _get_env_cpu_usage(self, cluster_config: dict = None): + def _get_process_cpu_usage(self, cluster_config: dict = None): total_memory = psutil.virtual_memory().total node_ip = get_node_ip() @@ -292,7 +291,7 @@ def _get_env_cpu_usage(self, cluster_config: dict = None): node_index, ) - def _get_env_gpu_usage(self): + def _get_process_gpu_usage(self): # currently works correctly for a single node GPU. Multinode-clusters will be supported shortly. collected_gpus_info = copy.deepcopy(self.gpu_metrics) @@ -304,8 +303,8 @@ def _get_env_gpu_usage(self): collected_gpus_info=collected_gpus_info, servlet_type=ServletType.env ) - def _collect_env_gpu_usage(self): - """periodically collects env gpu usage""" + def _collect_process_gpu_usage(self): + """periodically collects gpu usage""" pynvml.nvmlInit() # init nvidia ml info collection @@ -366,11 +365,13 @@ def _status_local_helper(self): servlet_pid, node_ip, node_index, - ) = self._get_env_cpu_usage(cluster_config) + ) = self._get_process_cpu_usage(cluster_config) # Try loading GPU data (if relevant) - env_gpu_usage = ( - self._get_env_gpu_usage() if cluster_config.get("has_cuda", False) else {} + process_gpu_usage = ( + self._get_process_gpu_usage() + if cluster_config.get("has_cuda", False) + else {} ) cluster_config = obj_store.cluster_config @@ -391,11 +392,11 @@ def _status_local_helper(self): self.gpu_metrics = None servlet_utilization_data = { - "env_gpu_usage": env_gpu_usage, + "process_gpu_usage": process_gpu_usage, "node_ip": node_ip, "node_name": node_name, "node_index": node_index, - "env_cpu_usage": env_memory_usage, + "process_cpu_usage": env_memory_usage, "pid": servlet_pid, } diff --git a/tests/constants.py b/tests/constants.py index 82a37658f..12c5e1d52 100644 --- a/tests/constants.py +++ b/tests/constants.py @@ -12,3 +12,5 @@ os.getenv("RH_AUTOSTOP_INTERVAL") or TESTING_AUTOSTOP_INTERVAL ), } + +TEST_REQS = ["pytest", "httpx", "pytest_asyncio", "pandas", "numpy<=1.26.4"] diff --git a/tests/fixtures/docker_cluster_fixtures.py b/tests/fixtures/docker_cluster_fixtures.py index 6b1422bce..1eaa51066 100644 --- a/tests/fixtures/docker_cluster_fixtures.py +++ b/tests/fixtures/docker_cluster_fixtures.py @@ -18,7 +18,7 @@ from tests.conftest import init_args from tests.constants import TEST_ENV_VARS -from tests.utils import friend_account, test_env +from tests.utils import friend_account, setup_test_base SSH_USER = "rh-docker-user" BASE_LOCAL_SSH_PORT = 32320 @@ -278,7 +278,7 @@ def set_up_local_cluster( rh_cluster.restart_server(resync_rh=True) if not rh_cluster.image: - test_env(logged_in=logged_in).to(rh_cluster) + setup_test_base(rh_cluster, logged_in=logged_in) def cleanup(): docker_client.containers.get(container_name).stop() @@ -353,7 +353,7 @@ def docker_cluster_pk_ssh(request, test_org_rns_folder): """This basic cluster fixture is set up with: - Public key authentication - Caddy set up on startup to forward Runhouse HTTP server to port 443 - - Default env with Ray 2.30.0 + - Default image with Ray 2.30.0 """ # From pytest config detached = request.config.getoption("--detached") @@ -457,7 +457,7 @@ def docker_cluster_pk_http_exposed(request, test_rns_folder): - Public key authentication - Den auth disabled (to mimic VPC) - Caddy set up on startup to forward Runhouse HTTP Server to port 80 - - Default conda_env with Python 3.11 and Ray 2.30.0 + - Default conda image with Python 3.11 and Ray 2.30.0 """ # From pytest config detached = request.config.getoption("--detached") diff --git a/tests/fixtures/on_demand_cluster_fixtures.py b/tests/fixtures/on_demand_cluster_fixtures.py index 33621641b..37e966277 100644 --- a/tests/fixtures/on_demand_cluster_fixtures.py +++ b/tests/fixtures/on_demand_cluster_fixtures.py @@ -8,8 +8,8 @@ from runhouse.resources.images.image import Image from tests.conftest import init_args -from tests.constants import TEST_ENV_VARS -from tests.utils import test_env +from tests.constants import TEST_ENV_VARS, TEST_REQS +from tests.utils import setup_test_base NUM_OF_NODES = 2 @@ -19,7 +19,7 @@ def restart_server(request): return request.config.getoption("--restart-server") -def setup_test_cluster(args, request, create_env=False): +def setup_test_cluster(args, request, setup_base=False): cluster = rh.ondemand_cluster(**args) init_args[id(cluster)] = args cluster.up_if_not() @@ -28,8 +28,8 @@ def setup_test_cluster(args, request, create_env=False): cluster.save() - if create_env or not cluster.image: - test_env().to(cluster) + if setup_base or not cluster.image: + setup_test_base(cluster) return cluster @@ -52,7 +52,7 @@ def ondemand_cluster(request): @pytest.fixture(scope="session") def ondemand_aws_docker_cluster(request): """ - Note: Also used to test docker and default env with alternate Ray version. + Note: Also used to test docker and default process with alternate Ray version. """ image = ( Image(name="default_image") @@ -67,7 +67,7 @@ def ondemand_aws_docker_cluster(request): "image": image, "sky_kwargs": {"launch": {"retry_until_up": True}}, } - cluster = setup_test_cluster(args, request, create_env=True) + cluster = setup_test_cluster(args, request, setup_base=True) return cluster @@ -99,7 +99,7 @@ def ondemand_gcp_cluster(request): conda_env_name="base_env", conda_yaml={"dependencies": ["python=3.11"], "name": "base_env"}, ) - .install_packages(test_env().reqs + ["ray==2.30.0"], conda_env_name="base_env") + .install_packages(TEST_REQS + ["ray==2.30.0"], conda_env_name="base_env") .set_env_vars(env_vars=TEST_ENV_VARS) ) args = { @@ -203,7 +203,7 @@ def multinode_cpu_docker_conda_cluster(request): conda_env_name="base_env", conda_yaml={"dependencies": ["python=3.11"], "name": "base_env"}, ) - .install_packages(test_env().reqs + ["ray==2.30.0"], conda_env_name="base_env") + .install_packages(TEST_REQS + ["ray==2.30.0"], conda_env_name="base_env") ) args = { "name": "rh-cpu-multinode", diff --git a/tests/fixtures/static_cluster_fixtures.py b/tests/fixtures/static_cluster_fixtures.py index 1d707b68f..373885974 100644 --- a/tests/fixtures/static_cluster_fixtures.py +++ b/tests/fixtures/static_cluster_fixtures.py @@ -3,7 +3,7 @@ import runhouse as rh from tests.conftest import init_args -from tests.utils import test_env +from tests.utils import setup_test_base @pytest.fixture(scope="session") @@ -44,6 +44,6 @@ def static_cpu_pwd_cluster(): c.restart_server(resync_rh=True) init_args[id(c)] = args - test_env().to(c) + setup_test_base(c) return c diff --git a/tests/test_obj_store.py b/tests/test_obj_store.py index 7c89d7d46..80fa44a00 100644 --- a/tests/test_obj_store.py +++ b/tests/test_obj_store.py @@ -151,11 +151,11 @@ def test_pinning_and_arg_replacement(cluster): assert pin_fn("put_pin") == "Found in obj store!" -def test_put_resource(cluster, test_env): - test_env.name = "~/test_env" - cluster.put_resource(test_env) - assert cluster.call("test_env", "config", stream_logs=True) == test_env.config() - assert cluster.get("test_env").config() == test_env.config() +def test_put_resource(cluster): + resource = rh.function(fn=do_printing_and_logging, system=cluster) + cluster.put_resource(resource) + assert cluster.call(resource.name, "config", stream_logs=True) == resource.config() + assert cluster.get(resource.name).config() == resource.config() def serialization_helper_1(): diff --git a/tests/test_resources/test_clusters/cluster_tests.py b/tests/test_resources/test_clusters/cluster_tests.py index a30110817..a3e0877b2 100644 --- a/tests/test_resources/test_clusters/cluster_tests.py +++ b/tests/test_resources/test_clusters/cluster_tests.py @@ -132,9 +132,7 @@ def test_byo_proxy(static_cpu_pwd_cluster, local_folder): assert status == 0 assert stdout == "hi\n" - summer_func = rh.function(summer, env=rh.env(working_dir="local:./")).to( - static_cpu_pwd_cluster - ) + summer_func = rh.function(summer).to(static_cpu_pwd_cluster) assert summer_func(1, 2) == 3 static_cpu_pwd_cluster.put("test_obj", list(range(10))) diff --git a/tests/test_resources/test_clusters/test_cluster.py b/tests/test_resources/test_clusters/test_cluster.py index 58a3a6469..90b793fb8 100644 --- a/tests/test_resources/test_clusters/test_cluster.py +++ b/tests/test_resources/test_clusters/test_cluster.py @@ -28,7 +28,7 @@ from runhouse.resources.images.image import ImageSetupStepType -from runhouse.utils import _process_env_vars, generate_default_name +from runhouse.utils import _process_env_vars import tests.test_resources.test_resource from tests.conftest import init_args @@ -99,7 +99,7 @@ def run_in_no_env(cmd): def run_node_all(cmd): - # This forces `cluster.run` to use ssh instead of calling an env run + # This forces `cluster.run` to use ssh instead of calling a process run return rh.here.run(cmd, node="all") @@ -227,7 +227,7 @@ def test_cluster_endpoint(self, cluster): 0 ] # getting the first element because the endpoint returns the status + response to den. assert status_data["cluster_config"]["resource_type"] == "cluster" - assert status_data["env_servlet_processes"] + assert status_data["servlet_processes"] assert isinstance(status_data["server_cpu_utilization"], float) assert status_data["server_memory_usage"] assert not status_data.get("server_gpu_usage", None) @@ -263,39 +263,32 @@ def test_cluster_objects(self, cluster): assert cluster.get(k2) == "v2" # Make new env - rh.env(reqs=["numpy"], name="numpy_env").to(cluster) - assert "numpy_env" in cluster.keys() + cluster.ensure_process_created("numpy_env") k3 = get_random_str() cluster.put(k3, "v3", process="numpy_env") assert k3 in cluster.keys() assert cluster.get(k3) == "v3" + @pytest.mark.skip("pending cluster.kill functionality") @pytest.mark.level("local") @pytest.mark.clustertest - def test_cluster_delete_env(self, cluster): - env1 = rh.env(reqs=["pytest"], name="env1").to(cluster) - env2 = rh.env(reqs=["pytest"], name="env2").to(cluster) - env3 = rh.env(reqs=["pytest"], name="env3") + def test_cluster_delete_process(self, cluster): + proc1 = cluster.ensure_process_created("p1") + proc2 = cluster.ensure_process_created("p2") - cluster.put("k1", "v1", process=env1.name) - cluster.put("k2", "v2", process=env2.name) - cluster.put_resource(env3, process=env1.name) + cluster.put("k1", "v1", process=proc1) + cluster.put("k2", "v2", process=proc2) # test delete env2 - assert cluster.get(env2.name) + assert cluster.get(proc2) assert cluster.get("k2") - cluster.delete(env2.name) - assert not cluster.get(env2.name) + cluster.delete(proc2) + assert not cluster.get(proc2) assert not cluster.get("k2") - # test delete env3, which doesn't affect env1 - assert cluster.get(env3.name) - - cluster.delete(env3.name) - assert not cluster.get(env3.name) - assert cluster.get(env1.name) + assert cluster.get(proc1) assert cluster.get("k1") @pytest.mark.level("local") @@ -473,9 +466,9 @@ def test_caller_token_propagated(self, cluster): @pytest.mark.level("local") @pytest.mark.clustertest def test_rh_status_pythonic(self, cluster): - worker_env = rh.env(reqs=["pytest", "pandas"], name="worker_env").to(cluster) - sleep_remote = rh.function(sleep_fn).to(cluster, process=worker_env.name) - cluster.put(key="status_key1", obj="status_value1", process="worker_env") + process = cluster.ensure_process_created("worker") + sleep_remote = rh.function(sleep_fn).to(cluster, process=process) + cluster.put(key="status_key1", obj="status_value1", process=process) # Run these in a separate thread so that the main thread can continue call_threads = [Thread(target=sleep_remote, args=[3]) for _ in range(3)] for call_thread in call_threads: @@ -486,7 +479,7 @@ def test_rh_status_pythonic(self, cluster): cluster_data = cluster.status() expected_cluster_status_data_keys = [ - "env_servlet_processes", + "servlet_processes", "server_pid", "runhouse_version", "cluster_config", @@ -510,22 +503,22 @@ def test_rh_status_pythonic(self, cluster): else: assert res.get("compute_properties").get("ips") == cluster.ips - assert "worker_env" in cluster_data.get("env_servlet_processes").keys() - assert "status_key1" in cluster_data.get("env_servlet_processes").get( - "worker_env" - ).get("env_resource_mapping") + assert process in cluster_data.get("servlet_processes").keys() + assert "status_key1" in cluster_data.get("servlet_processes").get(process).get( + "process_resource_mapping" + ) assert { "resource_type": "str", "active_function_calls": [], - } == cluster_data.get("env_servlet_processes").get("worker_env").get( - "env_resource_mapping" + } == cluster_data.get("servlet_processes").get(process).get( + "process_resource_mapping" ).get( "status_key1" ) sleep_calls = ( - cluster_data.get("env_servlet_processes") - .get("worker_env") - .get("env_resource_mapping") + cluster_data.get("servlet_processes") + .get(process) + .get("process_resource_mapping") .get("sleep_fn") .get("active_function_calls") ) @@ -542,9 +535,9 @@ def test_rh_status_pythonic(self, cluster): updated_status = cluster.status() # Check that the sleep calls are no longer active assert ( - updated_status.get("env_servlet_processes") - .get("worker_env") - .get("env_resource_mapping") + updated_status.get("servlet_processes") + .get(process) + .get("process_resource_mapping") .get("sleep_fn") .get("active_function_calls") == [] @@ -552,41 +545,42 @@ def test_rh_status_pythonic(self, cluster): # test memory usage info expected_servlet_keys = [ - "env_cpu_usage", - "env_gpu_usage", - "env_resource_mapping", "node_index", "node_ip", "node_name", "pid", + "process_cpu_usage", + "process_gpu_usage", + "process_resource_mapping", ] - envs_names = list(cluster_data.get("env_servlet_processes").keys()) - envs_names.sort() - assert "env_servlet_processes" in cluster_data.keys() - servlets_info = cluster_data.get("env_servlet_processes") - env_actors_keys = list(servlets_info.keys()) - env_actors_keys.sort() - assert envs_names == env_actors_keys - for env_name in envs_names: - servlet_info = servlets_info.get(env_name) + expected_servlet_keys.sort() + process_names = list(cluster_data.get("servlet_processes").keys()) + process_names.sort() + assert "servlet_processes" in cluster_data.keys() + servlets_info = cluster_data.get("servlet_processes") + actors_keys = list(servlets_info.keys()) + actors_keys.sort() + assert process_names == actors_keys + for process_name in process_names: + servlet_info = servlets_info.get(process_name) servlet_info_keys = list(servlet_info.keys()) servlet_info_keys.sort() assert servlet_info_keys == expected_servlet_keys + @pytest.mark.skip("pending cluster.kill functionality") @pytest.mark.level("local") @pytest.mark.clustertest - def test_rh_status_pythonic_delete_env(self, cluster): - env_name = generate_default_name(prefix="env", precision="ms") - env = rh.env(reqs=["pytest"], name=env_name).to(cluster) - summer_temp = rh.function(summer).to(system=cluster, process=env.name) + def test_rh_status_pythonic_delete_process(self, cluster): + process = cluster.ensure_process_created("process_name") + summer_temp = rh.function(summer).to(system=cluster, process=process) call_summer_temp = summer_temp(1, 3) assert call_summer_temp == 4 - # make sure status is calculated properly before temp_env deletion. + # make sure status is calculated properly before process deletion. self.test_rh_status_pythonic(cluster=cluster) - cluster.delete(env.env_name) - # make sure status is calculated properly after temp_env deletion. + cluster.delete(process) + # make sure status is calculated properly after process deletion. self.test_rh_status_pythonic(cluster=cluster) def status_cli_test_logic(self, cluster, status_cli_command: str): @@ -704,12 +698,12 @@ def test_rh_status_stopped(self, cluster): def test_send_status_to_db(self, cluster): status = cluster.status() - servlet_processes = status.pop("env_servlet_processes") + servlet_processes = status.pop("servlet_processes") status_data = { "daemon_status": RunhouseDaemonStatus.RUNNING, "resource_type": status.get("cluster_config").get("resource_type"), "resource_info": status, - "env_servlet_processes": servlet_processes, + "servlet_processes": servlet_processes, } cluster_uri = rh.globals.rns_client.format_rns_address(cluster.rns_address) headers = rh.globals.rns_client.request_headers() @@ -733,17 +727,17 @@ def test_send_status_to_db(self, cluster): assert get_status_data["resource_info"] == status for k in servlet_processes: - if servlet_processes[k]["env_gpu_usage"] == {}: - servlet_processes[k]["env_gpu_usage"] = { + if servlet_processes[k]["process_gpu_usage"] == {}: + servlet_processes[k]["process_gpu_usage"] = { "used_memory": None, "utilization_percent": None, "total_memory": None, } servlet_processes = sort_servlet_processes(servlet_processes) - get_status_data["env_servlet_processes"] = sort_servlet_processes( - get_status_data["env_servlet_processes"] + get_status_data["servlet_processes"] = sort_servlet_processes( + get_status_data["servlet_processes"] ) - assert get_status_data["env_servlet_processes"] == servlet_processes + assert get_status_data["servlet_processes"] == servlet_processes status_data["daemon_status"] = RunhouseDaemonStatus.TERMINATED post_status_data_resp = requests.post( @@ -771,14 +765,14 @@ def test_send_status_to_db(self, cluster): assert post_status_data_resp.status_code in [200, 422] #################################################################################################### - # Default env tests + # Default process tests #################################################################################################### @pytest.mark.level("local") @pytest.mark.clustertest def test_default_process_in_status(self, cluster): res = cluster.status() - assert DEFAULT_PROCESS_NAME in res.get("env_servlet_processes") + assert DEFAULT_PROCESS_NAME in res.get("servlet_processes") @pytest.mark.level("local") @pytest.mark.clustertest @@ -1177,7 +1171,7 @@ def test_cluster_list_cmd_output_with_filters(self, capsys, cluster): ): cluster.save() # tls exposed local cluster is not saved by default - env = set_output_env_vars() + subprocess_env = set_output_env_vars() for status in [ClusterStatus.RUNNING, ClusterStatus.TERMINATED]: process = subprocess.Popen( @@ -1185,7 +1179,7 @@ def test_cluster_list_cmd_output_with_filters(self, capsys, cluster): shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - env=env, + env=subprocess_env, ) process.wait() stdout = process.communicate()[0] diff --git a/tests/test_resources/test_clusters/test_multinode_cluster.py b/tests/test_resources/test_clusters/test_multinode_cluster.py index f368674c6..d51dd4071 100644 --- a/tests/test_resources/test_clusters/test_multinode_cluster.py +++ b/tests/test_resources/test_clusters/test_multinode_cluster.py @@ -65,37 +65,23 @@ def test_ray_started_on_worker_node_after_cluster_restart(self, cluster): @pytest.mark.level("release") def test_send_envs_to_specific_worker_node(self, cluster): - env_0 = rh.env( - name="worker_env_0", - reqs=["langchain", "pytest"], - ).to(cluster, node_idx=0) - - env_1 = rh.env( - name="worker_env_1", - reqs=["torch", "pytest"], - ).to(cluster, node_idx=1) - - env_2 = rh.env( - name="worker_env_2", - reqs=["transformers", "pytest"], - ).to(cluster, node_idx=2) + proc_0 = cluster.ensure_process_created("worker_0", compute={"node_idx": 0}) + proc_1 = cluster.ensure_process_created("worker_1", compute={"node_idx": 1}) + proc_2 = cluster.ensure_process_created("worker_2", compute={"node_idx": 1}) with pytest.raises(ValueError): - env_2.to( - cluster, - node_idx=len(cluster.ips), + cluster.ensure_process_created( + "worker_3", compute={"node_idx": len(cluster.ips)} ) - env_2.to(cluster, node_idx=1) - get_pid_0 = rh.function(get_pid_and_ray_node).to( - name="get_pid_0", system=cluster, process=env_0.name + name="get_pid_0", system=cluster, process=proc_0 ) get_pid_1 = rh.function(get_pid_and_ray_node).to( - name="get_pid_1", system=cluster, process=env_1.name + name="get_pid_1", system=cluster, process=proc_1 ) get_pid_2 = rh.function(get_pid_and_ray_node).to( - name="get_pid_2", system=cluster, process=env_2.name + name="get_pid_2", system=cluster, process=proc_2 ) with capture_stdout() as stdout_0: @@ -118,36 +104,25 @@ def test_send_envs_to_specific_worker_node(self, cluster): @pytest.mark.level("release") def test_specifying_resources(self, cluster): - env0 = rh.env( - name="worker_env_0", - compute={"CPU": 1.75}, - ).to(cluster) - - env1 = rh.env( - name="worker_env_1", - compute={"CPU": 0.5}, - ).to(cluster) - - env2 = rh.env( - name="worker_env_2", - compute={"memory": 4 * 1024 * 1024 * 1024}, - ).to(cluster) - - env3 = rh.env( - name="worker_env_3", - compute={"CPU": 0.1, "memory": 2 * 1024 * 1024 * 1024}, - ).to(cluster) + proc0 = cluster.ensure_process_created("worker_proc_0", compute={"CPU": 1.75}) + proc1 = cluster.ensure_process_created("worker_proc_1", compute={"CPU": 0.5}) + proc2 = cluster.ensure_process_created( + "worker_proc_2", compute={"memory": 4 * 1024 * 1024 * 1024} + ) + proc3 = cluster.ensure_process_created( + "worker_proc_3", compute={"CPU": 0.1, "memory": 2 * 1024 * 1024 * 1024} + ) status = cluster.status() - env0_node = status["env_servlet_processes"][env0.name]["node_ip"] - env1_node = status["env_servlet_processes"][env1.name]["node_ip"] - env2_node = status["env_servlet_processes"][env2.name]["node_ip"] - env3_node = status["env_servlet_processes"][env3.name]["node_ip"] - assert env0_node in cluster.internal_ips - assert env1_node in cluster.internal_ips - assert env2_node in cluster.internal_ips - assert env3_node in cluster.internal_ips - - assert env0_node != env1_node # Too much CPU - assert env2_node != env3_node # Too much memory + proc0_node = status["servlet_processes"][proc0]["node_ip"] + proc1_node = status["servlet_processes"][proc1]["node_ip"] + proc2_node = status["servlet_processes"][proc2]["node_ip"] + proc3_node = status["servlet_processes"][proc3]["node_ip"] + assert proc0_node in cluster.internal_ips + assert proc1_node in cluster.internal_ips + assert proc2_node in cluster.internal_ips + assert proc3_node in cluster.internal_ips + + assert proc0_node != proc1_node # Too much CPU + assert proc2_node != proc3_node # Too much memory diff --git a/tests/test_resources/test_clusters/test_on_demand_cluster.py b/tests/test_resources/test_clusters/test_on_demand_cluster.py index 5c6372a54..f1293cb81 100644 --- a/tests/test_resources/test_clusters/test_on_demand_cluster.py +++ b/tests/test_resources/test_clusters/test_on_demand_cluster.py @@ -146,47 +146,41 @@ def test_restart_does_not_change_config_yaml(self, cluster): @pytest.mark.level("minimal") def test_set_autostop(self, cluster): - rh.env( - working_dir="local:./", reqs=["pytest", "pandas"], name="autostop_env" - ).to(cluster) + process = cluster.ensure_process_created("autostop_env") - get_autostop = rh.fn(get_auotstop_from_on_cluster).to( - cluster, process="autostop_env" - ) + get_autostop = rh.fn(get_auotstop_from_on_cluster).to(cluster, process=process) # First check that the autostop is set to whatever the cluster set it to assert get_autostop() == cluster.autostop_mins original_autostop = cluster.autostop_mins set_autostop = rh.fn(set_autostop_from_on_cluster_via_ah).to( - cluster, process="autostop_env" + cluster, process=process ) set_autostop(5) assert get_autostop() == 5 set_autostop_via_cluster_keep_warm = rh.fn( set_autostop_from_on_cluster_via_cluster_keep_warm - ).to(cluster, process="autostop_env") + ).to(cluster, process=process) set_autostop_via_cluster_keep_warm() assert get_autostop() == -1 set_autostop_via_cluster_obj = rh.fn( set_autostop_from_on_cluster_via_cluster_obj - ).to(cluster, process="autostop_env") + ).to(cluster, process=process) # reset the autostop to the original value set_autostop_via_cluster_obj(original_autostop) assert get_autostop() == original_autostop @pytest.mark.level("minimal") def test_autostop_register_activity(self, cluster): - rh.env( - working_dir="local:./", reqs=["pytest", "pandas"], name="autostop_env" - ).to(cluster) + process = cluster.ensure_process_created("autostop_env") register_activity = rh.fn(register_activity_from_on_cluster).to( - cluster, process="autostop_env" + cluster, process=process ) get_last_active = rh.fn(get_last_active_time_from_on_cluster).to( - cluster, process="autostop_env" + cluster, process=process ) register_activity() diff --git a/tests/test_resources/test_data/test_package.py b/tests/test_resources/test_data/test_package.py index 43a6427f9..349f3a7ed 100644 --- a/tests/test_resources/test_data/test_package.py +++ b/tests/test_resources/test_data/test_package.py @@ -154,9 +154,10 @@ def test_local_reqs_on_cluster(self, cluster, local_package): @pytest.mark.level("local") def test_local_package_version_gets_installed(self, cluster): run_with_logs("pip install beautifulsoup4==4.11.1") - env = rh.env(name="temp_env", reqs=["beautifulsoup4"]).to(cluster) + cluster.install_packages(["beautifulsoup4"]) - remote_fn = rh.function(get_bs4_version).to(cluster, process=env.name) + process = cluster.ensure_process_created("temp_env") + remote_fn = rh.function(get_bs4_version).to(cluster, process=process) assert remote_fn() == "4.11.1" # --------- basic torch index-url testing --------- diff --git a/tests/test_resources/test_modules/test_functions/test_function.py b/tests/test_resources/test_modules/test_functions/test_function.py index 49eee9dc1..f856ea53f 100644 --- a/tests/test_resources/test_modules/test_functions/test_function.py +++ b/tests/test_resources/test_modules/test_functions/test_function.py @@ -140,10 +140,11 @@ def test_get_function_history(self, cluster, test_rns_folder): assert history @pytest.mark.level("local") - def test_function_in_new_env_with_multiprocessing(self, cluster): - numpy_env = rh.env(reqs=["numpy"], name="numpy_env").to(cluster) + def test_function_in_new_process_with_multiprocessing(self, cluster): + cluster.install_packages(["numpy"]) + new_process = cluster.ensure_process_created("numpy_process") multiproc_remote_sum = rh.function(multiproc_np_sum, name="test_function").to( - cluster, process=numpy_env.name + cluster, process=new_process ) summands = [[1, 3], [2, 4], [3, 5]] @@ -189,36 +190,7 @@ def test_remotes(self, cluster): pid_res = pid_blob.fetch() assert pid_res > 0 - @pytest.mark.skip("Install is way too heavy, choose a lighter example") - @pytest.mark.level("local") - def test_function_git_fn(self, cluster): - remote_parse = rh.function( - fn="https://github.com/huggingface/diffusers/blob/" - "main/examples/dreambooth/train_dreambooth.py:parse_args", - system=cluster, - env=[ - "torch==1.12.1 --verbose", - "torchvision==0.13.1", - "transformers", - "datasets", - "evaluate", - "accelerate", - "pip:./diffusers --verbose", - ], - ) - args = remote_parse( - input_args=[ - "--pretrained_model_name_or_path", - "stabilityai/stable-diffusion-2-base", - "--instance_data_dir", - "remote_image_dir", - "--instance_prompt", - "a photo of sks person", - ] - ) - assert ( - args.pretrained_model_name_or_path == "stabilityai/stable-diffusion-2-base" - ) + # TODO - test git function @pytest.mark.skip("Fix .run following local daemon refactor.") @pytest.mark.level("local") @@ -544,7 +516,7 @@ async def test_returns_coroutine(self, cluster): assert await future_module == 5 @pytest.mark.level("local") - def test_send_function_to_fresh_env(self, cluster): - env = rh.env(name="fresh_env", reqs=["numpy"]) - summer_remote = rh.function(summer).to(cluster, process=env.name) + def test_send_function_to_fresh_process(self, cluster): + process = cluster.ensure_process_created("fresh_process") + summer_remote = rh.function(summer).to(cluster, process=process) summer_remote(2, 3) diff --git a/tests/test_resources/test_modules/test_module.py b/tests/test_resources/test_modules/test_module.py index d7b48258d..a34580670 100644 --- a/tests/test_resources/test_modules/test_module.py +++ b/tests/test_resources/test_modules/test_module.py @@ -117,7 +117,7 @@ async def cpu_count_async(self): class ModuleConstructingOtherModule: - def construct_and_get_env(self): + def construct_and_get_process(self): remote_calc = rh.module(Calculator)() return remote_calc.process @@ -918,12 +918,9 @@ async def test_returns_coroutine(self, cluster): @pytest.mark.level("local") def test_construct_module_on_cluster(self, cluster): - env = rh.env( - name="test_env", - reqs=["pandas", "numpy"], - ).to(cluster) + process = cluster.ensure_process_created("test_env") remote_constructor_module = rh.module(ConstructorModule)().to( - cluster, process=env.name + cluster, process=process ) remote_constructor_module.construct_module_on_cluster() @@ -945,7 +942,7 @@ def test_import_editable_package(self, cluster, installed_editable_package): cluster.run("pip uninstall -y test_fake_package") @pytest.mark.level("local") - def test_import_editable_package_from_new_env( + def test_import_editable_package_from_new_process( self, cluster, installed_editable_package_copy ): importlib_reload(site) @@ -959,9 +956,9 @@ def test_import_editable_package_from_new_env( ) # Now send this to the remote cluster and test that it can still be imported and used - env = rh.env(name="fresh_env", reqs=["numpy"]).to(cluster) + process = cluster.ensure_process_created("fresh_process") remote_editable_package_module = rh.module(TestModuleFromPackage).to( - cluster, process=env.name + cluster, process=process ) assert ( remote_editable_package_module.hello_world() @@ -969,15 +966,12 @@ def test_import_editable_package_from_new_env( ) @pytest.mark.level("local") - def test_module_constructed_on_cluster_is_in_same_env(self, cluster): - env = rh.env( - name="special_env", - reqs=["pandas", "numpy"], - ).to(cluster) + def test_module_constructed_on_cluster_is_in_same_process(self, cluster): + process = cluster.ensure_process_created("special_process") remote_module = rh.module(ModuleConstructingOtherModule).to( - system=cluster, process=env.name + system=cluster, process=process ) - assert remote_module.construct_and_get_env() == env.name + assert remote_module.construct_and_get_process() == process @pytest.mark.level("local") def test_logs_stream_in_nested_call(self, cluster): @@ -985,13 +979,14 @@ def test_logs_stream_in_nested_call(self, cluster): RemoteClass = rh.module(SlowNumpyArray).to(cluster) remote_instance = RemoteClass(size=size, name="remote_instance1") - # TODO test in same env (works as of 4-Nov-24) + # TODO test in same process (works as of 4-Nov-24) # remote_helper_call = rh.function(nested_call_logs_stream_helper).to(cluster) - # Send to different env - helper_env = rh.env(name="helper_env", reqs=["pandas", "numpy"]).to(cluster) + # Send to different process + + helper_process = cluster.ensure_process_created("helper_process") remote_helper_call = rh.function(nested_call_logs_stream_helper).to( - cluster, process=helper_env.name + cluster, process=helper_process ) # TODO test with slow_iter call because not working with generator as of 4-Nov-24 diff --git a/tests/test_resources/test_modules/test_server_modules/dont_test_server_module.py b/tests/test_resources/test_modules/test_server_modules/dont_test_server_module.py index ca326a847..a801ab1ae 100644 --- a/tests/test_resources/test_modules/test_server_modules/dont_test_server_module.py +++ b/tests/test_resources/test_modules/test_server_modules/dont_test_server_module.py @@ -19,9 +19,7 @@ @pytest.mark.level("local") async def test_asgi_server(cluster): - fast_api_module = rh.asgi(app).to( - cluster, env=["pytest", "requests"], name="fast_api_module" - ) + fast_api_module = rh.asgi(app).to(cluster, name="fast_api_module") assert isinstance(fast_api_module, rh.Asgi) assert fast_api_module.summer(1, 2) == 3 diff --git a/tests/test_resources/test_resource.py b/tests/test_resources/test_resource.py index bf835a3ad..cfa232860 100644 --- a/tests/test_resources/test_resource.py +++ b/tests/test_resources/test_resource.py @@ -273,7 +273,7 @@ def test_sharing_org_resources( resource_name = f"/{TEST_ORG}/summer_func" args = {"name": resource_name, "fn": summer} - # Sending function to the cluster will save the function and associated env under the organization + # Sending function to the cluster will save the function under the organization f = rh.function(**args).to(docker_cluster_pk_ssh_den_auth) init_args[id(f)] = args diff --git a/tests/test_resources/test_secrets/test_secret.py b/tests/test_resources/test_secrets/test_secret.py index dec4881f3..c33d9edc0 100644 --- a/tests/test_resources/test_secrets/test_secret.py +++ b/tests/test_resources/test_secrets/test_secret.py @@ -4,6 +4,7 @@ import runhouse as rh +from runhouse.constants import DEFAULT_PROCESS_NAME from runhouse.globals import rns_client import tests.test_resources.test_resource @@ -136,7 +137,7 @@ def test_provider_secret_to_cluster_path(self, secret, cluster): assert_delete_local(secret, contents=delete_contents) @pytest.mark.level("local") - def test_provider_secret_to_cluster_env(self, secret, cluster): + def test_provider_secret_to_cluster_process(self, secret, cluster): if not isinstance(secret, rh.ProviderSecret): return @@ -144,11 +145,10 @@ def test_provider_secret_to_cluster_env(self, secret, cluster): if not env_vars: return - env = rh.env() get_remote_val = rh.function(_get_env_var_value, name="get_env_vars").to( cluster ) - secret.to(cluster, process=env.name) + secret.to(cluster, process=DEFAULT_PROCESS_NAME) for (key, val) in env_vars.items(): assert get_remote_val(val) == secret.values[key] diff --git a/tests/test_servers/conftest.py b/tests/test_servers/conftest.py index f63b747a3..edfa7a428 100644 --- a/tests/test_servers/conftest.py +++ b/tests/test_servers/conftest.py @@ -133,7 +133,7 @@ def obj_store(request): servlet_name = request.param _, test_obj_store = get_ray_servlet_and_obj_store(servlet_name) - # Clears everything, not just what's in this env servlet + # Clears everything, not just what's in this servlet test_obj_store.clear() yield test_obj_store diff --git a/tests/test_servers/test_server_obj_store.py b/tests/test_servers/test_server_obj_store.py index 3bda63be7..bd132b95a 100644 --- a/tests/test_servers/test_server_obj_store.py +++ b/tests/test_servers/test_server_obj_store.py @@ -10,13 +10,13 @@ def list_compare(list1, list2): return sorted(list1) == sorted(list2) -ENV_NAME_OBJ_STORE = "test_obj_store" +OBJ_STORE_NAME = "test_obj_store" @pytest.mark.servertest -@pytest.mark.parametrize("obj_store", [ENV_NAME_OBJ_STORE], indirect=True) +@pytest.mark.parametrize("obj_store", [OBJ_STORE_NAME], indirect=True) class TestObjStore: - """Start object store in a local base env servlet""" + """Start object store in a local base servlet""" @pytest.mark.level("unit") @pytest.mark.parametrize("key", ["k1", 123]) @@ -151,8 +151,8 @@ def test_many_servlets(self, obj_store): assert obj_store_2.get_servlet_name_for_key("k2") == obj_store_2.servlet_name assert obj_store_2.get_servlet_name_for_key("k3") == obj_store_2.servlet_name - # Technically, "k1" is only present on the base env servlet, - # and "k2" and "k3" are only present on the other env servlet + # Technically, "k1" is only present on the base servlet, + # and "k2" and "k3" are only present on the other servlet # These methods are static, we can run them from either store assert obj_store.keys_for_servlet_name(obj_store.servlet_name) == ["k1"] assert obj_store.get_from_servlet_name(obj_store.servlet_name, "k1") == "v1" @@ -271,7 +271,7 @@ def test_many_servlets(self, obj_store): assert obj_store.get_from_servlet_name(obj_store.servlet_name, "k1") == "v1" assert obj_store.keys_for_servlet_name(obj_store_2.servlet_name) == [] - # Testing of maintaining envs + # Testing of maintaining processes _, obj_store_3 = get_ray_servlet_and_obj_store("third") assert obj_store_3.keys() == ["k1"] obj_store_3.put("k2", "v2") @@ -317,7 +317,7 @@ def test_delete_servlet(self, obj_store): @pytest.mark.servertest -@pytest.mark.parametrize("obj_store", [ENV_NAME_OBJ_STORE], indirect=True) +@pytest.mark.parametrize("obj_store", [OBJ_STORE_NAME], indirect=True) class TestAuthCacheObjStore: """Start object store in a local auth cache servlet""" diff --git a/tests/utils.py b/tests/utils.py index 49cd1a4ea..fbbc26a17 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -12,13 +12,14 @@ import runhouse as rh import yaml +from runhouse.constants import DEFAULT_PROCESS_NAME from runhouse.globals import rns_client from runhouse.resources.hardware.utils import ClusterStatus, RunhouseDaemonStatus from runhouse.servers.http.http_utils import CreateProcessParams from runhouse.servers.obj_store import get_cluster_servlet, ObjStore, RaySetupOption -from tests.constants import TESTING_AUTOSTOP_INTERVAL, TESTING_LOG_LEVEL +from tests.constants import TEST_ENV_VARS, TEST_REQS def get_ray_servlet_and_obj_store(env_name): @@ -156,23 +157,16 @@ def org_friend_account(new_username: str, token: str, original_username: str): rns_client.load_account_from_file() -def test_env(logged_in=False): - return rh.env( - reqs=["pytest", "httpx", "pytest_asyncio", "pandas", "numpy<=1.26.4"], - working_dir=None, - env_vars={ - "RH_LOG_LEVEL": os.getenv("RH_LOG_LEVEL") or TESTING_LOG_LEVEL, - "RH_AUTOSTOP_INTERVAL": str( - os.getenv("RH_AUTOSTOP_INTERVAL") or TESTING_AUTOSTOP_INTERVAL - ), - }, - setup_cmds=[ - f"mkdir -p ~/.rh; touch ~/.rh/config.yaml; " - f"echo '{yaml.safe_dump(rh.configs.defaults_cache)}' > ~/.rh/config.yaml" - ] - if logged_in - else False, - ) +def setup_test_base(cluster, logged_in=False): + setup_cmds = [ + f"mkdir -p ~/.rh; touch ~/.rh/config.yaml; " + f"echo '{yaml.safe_dump(rh.configs.defaults_cache)}' > ~/.rh/config.yaml" + ] + + cluster.install_packages(TEST_REQS) + cluster.set_process_env_vars(DEFAULT_PROCESS_NAME, TEST_ENV_VARS) + if logged_in: + cluster.run(setup_cmds) def remove_config_keys(config, keys_to_skip):