Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions runhouse/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,18 +226,19 @@ def print_envs_info(servlet_processes: Dict[str, Dict[str, Any]], current_cluste
envs_in_cluster_headline = "Serving 🍦 :"
console.print(envs_in_cluster_headline)

env_resource_mapping = {
env: servlet_processes[env]["env_resource_mapping"] for env in servlet_processes
process_resource_mapping = {
env: servlet_processes[env]["process_resource_mapping"]
for env in servlet_processes
}

if len(env_resource_mapping) == 0:
if len(process_resource_mapping) == 0:
console.print("This cluster has no environment nor resources.")

first_envs_to_print = []

# First: if the default env does not have resources, print it.
default_process_name = DEFAULT_PROCESS_NAME
if len(env_resource_mapping[default_process_name]) <= 1:
if len(process_resource_mapping[default_process_name]) <= 1:
# case where the default env doesn't hve any other resources, apart from the default env itself.
console.print(f"{BULLET_UNICODE} {default_process_name}")
console.print(
Expand All @@ -253,11 +254,11 @@ def print_envs_info(servlet_processes: Dict[str, Dict[str, Any]], current_cluste
# (the only resource they have is a runhouse.env, which is the env itself).
first_envs_to_print = first_envs_to_print + [
env_name
for env_name in env_resource_mapping
for env_name in process_resource_mapping
if (
len(env_resource_mapping[env_name]) <= 1
len(process_resource_mapping[env_name]) <= 1
and env_name != default_process_name
and env_resource_mapping[env_name]
and process_resource_mapping[env_name]
)
]

Expand All @@ -267,19 +268,19 @@ def print_envs_info(servlet_processes: Dict[str, Dict[str, Any]], current_cluste
# * Else, we will print the resources (rh.function, th.module) associated with the env.
envs_to_print = first_envs_to_print + [
env_name
for env_name in env_resource_mapping
for env_name in process_resource_mapping
if env_name not in first_envs_to_print + [default_process_name]
]

for env_name in envs_to_print:
resources_in_env = env_resource_mapping[env_name]
resources_in_env = process_resource_mapping[env_name]
env_process_info = servlet_processes[env_name]

env_name_txt = f"{BULLET_UNICODE} {env_name} | pid: {env_process_info['pid']} | node: {env_process_info['node_name']}"
console.print(env_name_txt)

# Print CPU info
env_cpu_info = env_process_info.get("env_cpu_usage")
env_cpu_info = env_process_info.get("process_cpu_usage")
if env_cpu_info:

# convert bytes to GB
Expand All @@ -306,7 +307,7 @@ def print_envs_info(servlet_processes: Dict[str, Dict[str, Any]], current_cluste
console.print(cpu_usage_summary)

# Print GPU info
env_gpu_info = env_process_info.get("env_gpu_usage")
env_gpu_info = env_process_info.get("process_gpu_usage")

# sometimes the cluster has no GPU, therefore the env_gpu_info is an empty dictionary.
if env_gpu_info:
Expand Down Expand Up @@ -436,7 +437,7 @@ def print_status(status_data: dict, current_cluster) -> None:
from runhouse.main import console

cluster_config = status_data.get("cluster_config")
servlet_processes = status_data.get("env_servlet_processes")
servlet_processes = status_data.get("servlet_processes")

cluster_name = cluster_config.get("name", None)
if cluster_name:
Expand Down
4 changes: 2 additions & 2 deletions runhouse/resources/functions/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ def to(
name: Optional[str] = None,
):
"""
Send the function to the specified env on the cluster. This will sync over relevant code and packages
Send the function to the specified process on the cluster. This will sync over relevant code and packages
onto the cluster, and set up the environment if it does not yet exist on the cluster.

Args:
system (str or Cluster): The system to setup the function and env on.
system (str or Cluster): The system to setup the function and process on.
process (str or Dict, optional): The process to run the module on, if it's a Dict, it will be explicitly created with those args.
or the set of requirements necessary to run the module. (Default: ``None``)
name (Optional[str], optional): Name to give to the function resource, if you wish to rename it.
Expand Down
2 changes: 1 addition & 1 deletion runhouse/resources/functions/function_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def function(
dryrun: bool = False,
serialize_notebook_fn: bool = False,
):
"""runhouse.function(fn: str | Callable | None = None, name: str | None = None, system: str | Cluster | None = None, env: str | List[str] | Env | None = None, dryrun: bool = False, load_secrets: bool = False, serialize_notebook_fn: bool = False)
"""runhouse.function(fn: str | Callable | None = None, name: str | None = None, system: str | Cluster | None = None, dryrun: bool = False, serialize_notebook_fn: bool = False)

Builds an instance of :class:`Function`.

Expand Down
10 changes: 5 additions & 5 deletions runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ def _sync_image_to_cluster(self):
env_vars = {}
log_level = os.getenv("RH_LOG_LEVEL")
if log_level:
# add log level to the default env to ensure it gets set on the cluster when the server is restarted
# add log level to the default image to ensure it gets set on the cluster when the server is restarted
env_vars["RH_LOG_LEVEL"] = log_level
logger.info(f"Using log level {log_level} on cluster's default env")

Expand Down Expand Up @@ -813,19 +813,19 @@ def put_resource(
process (str, optional): Process of the object store to put the object in. (Default: ``None``)
"""
# Logic to get env_name from different ways env can be provided
env_name = process or (
process = process or (
resource.process if hasattr(resource, "process") else DEFAULT_PROCESS_NAME
)

state = state or {}
if self.on_this_cluster():
data = (resource.config(condensed=False), state, dryrun)
return obj_store.put_resource(serialized_data=data, process=env_name)
return obj_store.put_resource(serialized_data=data, process=process)
return self.call_client_method(
"put_resource",
resource,
state=state or {},
process=env_name,
process=process,
dryrun=dryrun,
)

Expand Down Expand Up @@ -987,7 +987,7 @@ def status(self, send_to_den: bool = False):
elif den_resp_status_code != 200:
logger.warning("Failed to send cluster status to Den")

if not configs.observability_enabled and status.get("env_servlet_processes"):
if not configs.observability_enabled and status.get("servlet_processes"):
logger.warning(
"Cluster observability is disabled. Metrics are stale and will "
"no longer be collected. To re-enable observability, please "
Expand Down
2 changes: 1 addition & 1 deletion runhouse/rns/rns_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ResourceStatusData(BaseModel):
server_gpu_utilization: Optional[float]
server_memory_usage: Dict[str, Any]
server_gpu_usage: Optional[Dict[str, Any]]
env_servlet_processes: Dict[str, Dict[str, Any]]
servlet_processes: Dict[str, Dict[str, Any]]
server_pid: int
runhouse_version: str

Expand Down
34 changes: 17 additions & 17 deletions runhouse/servers/cluster_servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ async def aget_servlet_name_for_key(self, key: Any) -> str:
async def aput_servlet_name_for_key(self, key: Any, servlet_name: str):
if not await self.ais_servlet_name_initialized(servlet_name):
raise ValueError(
f"Env servlet name {servlet_name} not initialized, and you tried to mark a resource as in it."
f"Servlet name {servlet_name} not initialized, and you tried to mark a resource as in it."
)
self._key_to_servlet_name[key] = servlet_name

Expand All @@ -268,16 +268,16 @@ async def aclear_key_to_servlet_name_dict(self):
self._key_to_servlet_name = {}

##############################################
# Remove Env Servlet
# Remove Servlet
##############################################
async def aclear_all_references_to_servlet_name(self, servlet_name: str):
# using lock to prevent status thread access self._initialized_servlet_args before the env is deleted.
# using lock to prevent status thread access self._initialized_servlet_args before the process is deleted.
with self.lock:
del self._initialized_servlet_args[servlet_name]
deleted_keys = [
key
for key, env in self._key_to_servlet_name.items()
if env == servlet_name
for key, process in self._key_to_servlet_name.items()
if process == servlet_name
]
for key in deleted_keys:
self._key_to_servlet_name.pop(key)
Expand All @@ -292,15 +292,15 @@ async def asave_status_metrics_to_den(self, status: dict):
# making a copy so the status won't be modified with pop, since it will be returned after sending to den.
# (status is passed as pointer).
status_copy = copy.deepcopy(status)
servlet_processes = status_copy.pop("env_servlet_processes")
servlet_processes = status_copy.pop("servlet_processes")

status_data = {
"daemon_status": RunhouseDaemonStatus.RUNNING,
"resource_type": status_copy.get("cluster_config").pop(
"resource_type", "cluster"
),
"resource_info": status_copy,
"env_servlet_processes": servlet_processes,
"servlet_processes": servlet_processes,
}

client = httpx.AsyncClient()
Expand Down Expand Up @@ -486,14 +486,14 @@ async def _update_autostop(self, status: dict):
function_running = any(
any(
len(
resource["env_resource_mapping"][resource_name].get(
resource["process_resource_mapping"][resource_name].get(
"active_function_calls", []
)
)
> 0
for resource_name in resource["env_resource_mapping"].keys()
for resource_name in resource["process_resource_mapping"].keys()
)
for resource in status.get("env_servlet_processes", {}).values()
for resource in status.get("servlet_processes", {}).values()
)
if function_running:
await self.autostop_helper.set_last_active_time_to_now()
Expand All @@ -520,7 +520,7 @@ async def _status_for_servlet(self, servlet_name):
}

# Need to catch the exception here because we're running this in a gather,
# and need to know which env servlet failed
# and need to know which servlet failed
except Exception as e:
return {"servlet_name": servlet_name, "Exception": e}

Expand Down Expand Up @@ -600,7 +600,7 @@ async def astatus(self, send_to_den: bool = False) -> Tuple[Dict, Optional[int]]
# Popping out creds because we don't want to show them in the status
config_cluster.pop("creds", None)

# Getting data from each env servlet about the objects it contains and the utilization data
# Getting data from each servlet about the objects it contains and the utilization data
servlet_utilization_data = {}
with self.lock:
servlets_status = await asyncio.gather(
Expand All @@ -610,22 +610,22 @@ async def astatus(self, send_to_den: bool = False) -> Tuple[Dict, Optional[int]]
],
)

# Store the data for the appropriate env servlet name
# Store the data for the appropriate servlet name
for env_status in servlets_status:
servlet_name = env_status.get("servlet_name")

# Nothing to store if there was an exception
if "Exception" in env_status.keys():
e = env_status.get("Exception")
logger.warning(
f"Exception {str(e)} in status for env servlet {servlet_name}"
f"Exception {str(e)} in status for servlet {servlet_name}"
)
servlet_utilization_data[servlet_name] = {}

else:
# Store what was in the env and the utilization data
# Store what was in the process and the utilization data
env_memory_info = env_status.get("servlet_utilization_data")
env_memory_info["env_resource_mapping"] = env_status.get(
env_memory_info["process_resource_mapping"] = env_status.get(
"objects_in_servlet"
)
servlet_utilization_data[servlet_name] = env_memory_info
Expand Down Expand Up @@ -672,7 +672,7 @@ async def astatus(self, send_to_den: bool = False) -> Tuple[Dict, Optional[int]]
"cluster_config": config_cluster,
"runhouse_version": runhouse.__version__,
"server_pid": self.pid,
"env_servlet_processes": servlet_utilization_data,
"servlet_processes": servlet_utilization_data,
"server_cpu_utilization": cpu_utilization,
"server_gpu_utilization": gpu_utilization,
"server_memory_usage": memory_usage,
Expand Down
2 changes: 1 addition & 1 deletion runhouse/servers/http/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ async def ainitialize(
),
)

# We initialize a default env servlet where some things may run.
# We initialize a default servlet where some things may run.
_ = obj_store.get_servlet(
name=DEFAULT_PROCESS_NAME,
create_process_params=CreateProcessParams(
Expand Down
25 changes: 13 additions & 12 deletions runhouse/servers/servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ async def __init__(
# Ray defaults to setting OMP_NUM_THREADS to 1, which unexpectedly limit parallelism in user programs.
# We delete it by default, but if we find that the user explicitly set it to another value, we respect that.
# This is really only a factor if the user set the value inside the VM or container, or inside the base_env
# which a cluster was initialized with. If they set it inside the env constructor and the env was sent to the
# cluster normally with .to, it will be set after this point.
# which a cluster was initialized with.
# TODO this had no effect when we did it below where we set CUDA_VISIBLE_DEVICES, so we may need to move that
# here and mirror the same behavior (setting it based on the detected gpus in the whole cluster may not work
# for multinode, but popping it may also break things, it needs to be tested).
Expand All @@ -132,7 +131,7 @@ async def __init__(
if is_gpu_cluster():
logger.debug("Creating _periodic_gpu_check thread.")
collect_gpu_thread = threading.Thread(
target=self._collect_env_gpu_usage, daemon=True
target=self._collect_process_gpu_usage, daemon=True
)
collect_gpu_thread.start()

Expand Down Expand Up @@ -250,7 +249,7 @@ async def adelete_local(self, key: Any):
async def aclear_local(self):
return await obj_store.aclear_local()

def _get_env_cpu_usage(self, cluster_config: dict = None):
def _get_process_cpu_usage(self, cluster_config: dict = None):

total_memory = psutil.virtual_memory().total
node_ip = get_node_ip()
Expand Down Expand Up @@ -292,7 +291,7 @@ def _get_env_cpu_usage(self, cluster_config: dict = None):
node_index,
)

def _get_env_gpu_usage(self):
def _get_process_gpu_usage(self):
# currently works correctly for a single node GPU. Multinode-clusters will be supported shortly.

collected_gpus_info = copy.deepcopy(self.gpu_metrics)
Expand All @@ -304,8 +303,8 @@ def _get_env_gpu_usage(self):
collected_gpus_info=collected_gpus_info, servlet_type=ServletType.env
)

def _collect_env_gpu_usage(self):
"""periodically collects env gpu usage"""
def _collect_process_gpu_usage(self):
"""periodically collects gpu usage"""

pynvml.nvmlInit() # init nvidia ml info collection

Expand Down Expand Up @@ -366,11 +365,13 @@ def _status_local_helper(self):
servlet_pid,
node_ip,
node_index,
) = self._get_env_cpu_usage(cluster_config)
) = self._get_process_cpu_usage(cluster_config)

# Try loading GPU data (if relevant)
env_gpu_usage = (
self._get_env_gpu_usage() if cluster_config.get("has_cuda", False) else {}
process_gpu_usage = (
self._get_process_gpu_usage()
if cluster_config.get("has_cuda", False)
else {}
)

cluster_config = obj_store.cluster_config
Expand All @@ -391,11 +392,11 @@ def _status_local_helper(self):
self.gpu_metrics = None

servlet_utilization_data = {
"env_gpu_usage": env_gpu_usage,
"process_gpu_usage": process_gpu_usage,
"node_ip": node_ip,
"node_name": node_name,
"node_index": node_index,
"env_cpu_usage": env_memory_usage,
"process_cpu_usage": env_memory_usage,
"pid": servlet_pid,
}

Expand Down
2 changes: 2 additions & 0 deletions tests/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@
os.getenv("RH_AUTOSTOP_INTERVAL") or TESTING_AUTOSTOP_INTERVAL
),
}

TEST_REQS = ["pytest", "httpx", "pytest_asyncio", "pandas", "numpy<=1.26.4"]
Loading
Loading