Skip to content

Commit 1905343

Browse files
Alexandra BelousovAlexandra Belousov
authored andcommitted
raise node deprature exceptions - local-launched clusters
1 parent 21f228f commit 1905343

File tree

6 files changed

+110
-52
lines changed

6 files changed

+110
-52
lines changed

runhouse/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,4 @@
117117
SSH_SKY_SECRET_NAME = "ssh-sky-key"
118118

119119
INSUFFICIENT_DISK_MSG = "No space left on device"
120+
CONNECTION_REFUSED_MSG = "Connection refused"

runhouse/exceptions.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,16 @@ def __init__(
2525
)
2626
msg = f"{msg}. To resolve it, teardown the cluster and re-launch it with larger disk size."
2727
super().__init__(msg)
28+
29+
30+
class ClusterTerminated(Exception):
31+
"""Raised when we try to connect to a terminated cluster"""
32+
33+
def __init__(
34+
self,
35+
error_msg: str = None,
36+
) -> None:
37+
self.error_msg = error_msg
38+
msg = self.error_msg if self.error_msg else "Cluster is terminated"
39+
msg = f"{msg}. To resolve it, please bring the cluster up by using the Runhouse's python APIs or CLI commands."
40+
super().__init__(msg)

runhouse/resources/hardware/cluster.py

Lines changed: 73 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
get_running_and_not_running_clusters,
2727
get_unsaved_live_clusters,
2828
parse_filters,
29+
RunhouseDaemonStatus,
2930
)
3031

3132
from runhouse.resources.images import Image, ImageSetupStepType
@@ -68,6 +69,7 @@
6869
NUM_PORTS_TO_TRY,
6970
RESERVED_SYSTEM_NAMES,
7071
)
72+
from runhouse.exceptions import ClusterTerminated
7173
from runhouse.globals import configs, obj_store, rns_client
7274
from runhouse.logger import get_logger
7375

@@ -1043,6 +1045,32 @@ def status(self, send_to_den: bool = False):
10431045

10441046
return status
10451047

1048+
def _update_cluster_status_to_terminated_in_den(self):
1049+
if self.rns_address:
1050+
try:
1051+
# Update Den with the terminated status
1052+
status_data = {
1053+
"daemon_status": RunhouseDaemonStatus.TERMINATED,
1054+
"resource_type": self.__class__.__base__.__name__.lower(),
1055+
"data": {},
1056+
}
1057+
1058+
cluster_uri = rns_client.format_rns_address(self.rns_address)
1059+
status_resp = requests.post(
1060+
f"{rns_client.api_server_url}/resource/{cluster_uri}/cluster/status",
1061+
json=status_data,
1062+
headers=rns_client.request_headers(),
1063+
)
1064+
1065+
# Note: 404 means that the cluster is not saved in Den
1066+
if status_resp.status_code not in [200, 404]:
1067+
logger.warning(
1068+
"Failed to update Den with terminated cluster status"
1069+
)
1070+
1071+
except Exception as e:
1072+
logger.warning(e)
1073+
10461074
def ssh_tunnel(
10471075
self, local_port, remote_port=None, num_ports_to_try: int = 0
10481076
) -> "SshTunnel":
@@ -1488,6 +1516,11 @@ def rsync(
14881516
if not src_node and up and not Path(source).expanduser().exists():
14891517
raise ValueError(f"Could not locate path to sync: {source}.")
14901518

1519+
# The cluster is probably auto-stopped / terminated via CLI while calling cluster.rsync
1520+
if not self.is_up():
1521+
self._update_cluster_status_to_terminated_in_den()
1522+
raise ClusterTerminated()
1523+
14911524
# before syncing the object, making sure there is enough disk space for it. If we don't preform this check in
14921525
# advance, there might be a case where we start rsyncing the object -> during the rsync no disk space will
14931526
# be left (because the object is too big) -> the rsync will stack and no results will be return (even failures),
@@ -1613,14 +1646,19 @@ def rsync(
16131646
logger.info(f"Rsyncing {source} on {node} to {dest}")
16141647
Path(dest).expanduser().parent.mkdir(parents=True, exist_ok=True)
16151648

1616-
runner.rsync(
1617-
source,
1618-
dest,
1619-
up=up,
1620-
filter_options=filter_options,
1621-
stream_logs=stream_logs,
1622-
ignore_existing=ignore_existing,
1623-
)
1649+
try:
1650+
runner.rsync(
1651+
source,
1652+
dest,
1653+
up=up,
1654+
filter_options=filter_options,
1655+
stream_logs=stream_logs,
1656+
ignore_existing=ignore_existing,
1657+
)
1658+
except ClusterTerminated as e:
1659+
if not self._ping():
1660+
self._update_cluster_status_to_terminated_in_den()
1661+
raise e
16241662
return
16251663

16261664
# Case 3: rsync with password between cluster and local (e.g. laptop)
@@ -1636,16 +1674,21 @@ def rsync(
16361674
logger.info(f"Rsyncing {source} on {node} to {dest}")
16371675
Path(dest).expanduser().parent.mkdir(parents=True, exist_ok=True)
16381676

1639-
rsync_cmd = runner.rsync(
1640-
source,
1641-
dest,
1642-
up=up,
1643-
filter_options=filter_options,
1644-
stream_logs=stream_logs,
1645-
return_cmd=True,
1646-
ignore_existing=ignore_existing,
1647-
)
1648-
run_command_with_password_login(rsync_cmd, pwd, stream_logs)
1677+
try:
1678+
rsync_cmd = runner.rsync(
1679+
source,
1680+
dest,
1681+
up=up,
1682+
filter_options=filter_options,
1683+
stream_logs=stream_logs,
1684+
return_cmd=True,
1685+
ignore_existing=ignore_existing,
1686+
)
1687+
run_command_with_password_login(rsync_cmd, pwd, stream_logs)
1688+
except ClusterTerminated as e:
1689+
if not self._ping():
1690+
self._update_cluster_status_to_terminated_in_den()
1691+
raise e
16491692

16501693
def _local_rsync(
16511694
self,
@@ -2153,6 +2196,18 @@ def run_python(
21532196
conda_env_name=conda_env_name,
21542197
)
21552198

2199+
if node == "all":
2200+
status_code = set([return_code[0][0] for return_code in return_codes])
2201+
python_run_failed = len(status_code) > 1 or status_code.pop() != 0
2202+
else:
2203+
status_code = return_codes[0][0]
2204+
python_run_failed = status_code != 0
2205+
if python_run_failed:
2206+
if not self._ping():
2207+
self._update_cluster_status_to_terminated_in_den()
2208+
raise ClusterTerminated()
2209+
raise ValueError(f"Could not run python code on cluster, got {status_code}")
2210+
21562211
return return_codes
21572212

21582213
def create_conda_env(self, conda_env_name: str, conda_config: Dict):

runhouse/resources/hardware/on_demand_cluster.py

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
from pathlib import Path
88
from typing import Any, Dict, List, Union
99

10-
import requests
11-
1210
import rich.errors
1311

1412
try:
@@ -32,7 +30,6 @@
3230
ClusterStatus,
3331
LauncherType,
3432
pprint_launched_cluster_summary,
35-
RunhouseDaemonStatus,
3633
ServerConnectionType,
3734
up_cluster_helper,
3835
)
@@ -684,29 +681,7 @@ def teardown(self, verbose: bool = True):
684681
LocalLauncher.teardown(cluster=self, verbose=verbose)
685682

686683
if self.rns_address is not None:
687-
try:
688-
# Update Den with the terminated status
689-
status_data = {
690-
"daemon_status": RunhouseDaemonStatus.TERMINATED,
691-
"resource_type": self.__class__.__base__.__name__.lower(),
692-
"data": {},
693-
}
694-
695-
cluster_uri = rns_client.format_rns_address(self.rns_address)
696-
status_resp = requests.post(
697-
f"{rns_client.api_server_url}/resource/{cluster_uri}/cluster/status",
698-
json=status_data,
699-
headers=rns_client.request_headers(),
700-
)
701-
702-
# Note: 404 means that the cluster is not saved in Den
703-
if status_resp.status_code not in [200, 404]:
704-
logger.warning(
705-
"Failed to update Den with terminated cluster status"
706-
)
707-
708-
except Exception as e:
709-
logger.warning(e)
684+
self._update_cluster_status_to_terminated_in_den()
710685

711686
def teardown_and_delete(self, verbose: bool = True):
712687
"""Teardown cluster and delete it from configs.

runhouse/resources/hardware/sky/subprocess_utils.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
from runhouse.logger import get_logger
66

7-
from runhouse.exceptions import InsufficientDisk
8-
from runhouse.constants import INSUFFICIENT_DISK_MSG
7+
from runhouse.exceptions import InsufficientDisk, ClusterTerminated
8+
from runhouse.constants import INSUFFICIENT_DISK_MSG, CONNECTION_REFUSED_MSG
99

1010
logger = get_logger(__name__)
1111

@@ -50,6 +50,8 @@ def handle_returncode(returncode: int,
5050

5151
if INSUFFICIENT_DISK_MSG in stderr:
5252
raise InsufficientDisk(command=command, error_msg=error_msg)
53+
if CONNECTION_REFUSED_MSG in stderr:
54+
raise ClusterTerminated()
5355
if callable(error_msg):
5456
error_msg = error_msg()
5557
raise CommandError(returncode, command, error_msg, stderr)

runhouse/servers/http/http_client.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import requests
1515

16+
from runhouse.exceptions import ClusterTerminated
1617
from runhouse.globals import rns_client
1718
from runhouse.logger import get_logger
1819

@@ -199,6 +200,12 @@ def request(
199200
headers=headers,
200201
)
201202

203+
def _ping_cluster(self):
204+
try:
205+
self.check_server()
206+
except ValueError:
207+
raise ClusterTerminated()
208+
202209
def request_json(
203210
self,
204211
endpoint: str,
@@ -250,12 +257,17 @@ def request_json(
250257
)
251258
resp_json = response.json()
252259
if isinstance(resp_json, dict) and "output_type" in resp_json:
253-
return handle_response(
254-
resp_json,
255-
resp_json["output_type"],
256-
err_str,
257-
log_formatter=self.log_formatter,
258-
)
260+
try:
261+
return handle_response(
262+
resp_json,
263+
resp_json["output_type"],
264+
err_str,
265+
log_formatter=self.log_formatter,
266+
)
267+
except ConnectionError:
268+
self._ping_cluster()
269+
except requests.exceptions.ConnectionError:
270+
self._ping_cluster()
259271
return resp_json
260272

261273
def check_server(self):

0 commit comments

Comments
 (0)