Skip to content

[k8s] Better error message for stale jobs controller #5274

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sky/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ class ClusterDoesNotExist(ValueError):
pass


class CachedClusterUnavailable(Exception):
"""Raised when a cached cluster record is unavailable."""
pass


class NotSupportedError(Exception):
"""Raised when a feature is not supported."""
pass
Expand Down
34 changes: 31 additions & 3 deletions sky/jobs/server/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from sky import core
from sky import exceptions
from sky import execution
from sky import global_user_state
from sky import provision as provision_lib
from sky import sky_logging
from sky import task as task_lib
Expand Down Expand Up @@ -64,6 +65,8 @@ def launch(
ValueError: cluster does not exist. Or, the entrypoint is not a valid
chain dag.
sky.exceptions.NotSupportedError: the feature is not supported.
sky.exceptions.CachedClusterUnavailable: cached jobs controller cluster
is unavailable

Returns:
job_id: Optional[int]; the job ID of the submitted job. None if the
Expand Down Expand Up @@ -103,6 +106,31 @@ def launch(
with rich_utils.safe_status(
ux_utils.spinner_message('Initializing managed job')):

# Check whether cached jobs controller cluster is accessible
cluster_name = (
controller_utils.Controllers.JOBS_CONTROLLER.value.cluster_name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity - This seems like a constant, is that intentional?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are talking about the Controllers.JOBS_CONTROLLER enum, then yes I believe this is a fixed value - one controller node per user. The purpose of the PR is to check if the controller is actually alive and well, so I use that specific cluster_name to query the cluster.

record = global_user_state.get_cluster_from_name(cluster_name)
if record is not None:
# there is a cached jobs controller cluster
try:
# TODO: do something with returned status?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you intend to address this TODO in this PR or is this a general comment?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

general comment. Currently we don't use the return value because it is not necessary.

_, _ = backend_utils.refresh_cluster_status_handle(
cluster_name=cluster_name,
force_refresh_statuses=set(status_lib.ClusterStatus),
acquire_per_cluster_status_lock=False)
except (exceptions.ClusterOwnerIdentityMismatchError,
exceptions.CloudUserIdentityError,
exceptions.ClusterStatusFetchingError) as e:
# we weren't able to refresh the cluster for its status.
with ux_utils.print_exception_no_traceback():
raise exceptions.CachedClusterUnavailable(
f'Cached jobs controller cluster '
f'{cluster_name} cannot be refreshed. Please check if '
'the cluster is accessible. If the cluster was '
'removed, consider removing the cluster from SkyPilot '
f'with:\n\n`sky down {cluster_name} --purge`\n\n'
f'Reason: {common_utils.format_exception(e)}')

local_to_controller_file_mounts = {}

if storage_lib.get_cached_enabled_storage_cloud_names_or_refresh():
Expand Down Expand Up @@ -142,11 +170,11 @@ def launch(
remote_user_config_path = f'{prefix}/{dag.name}-{dag_uuid}.config_yaml'
remote_env_file_path = f'{prefix}/{dag.name}-{dag_uuid}.env'
controller_resources = controller_utils.get_controller_resources(
controller=controller_utils.Controllers.JOBS_CONTROLLER,
controller=controller,
task_resources=sum([list(t.resources) for t in dag.tasks], []))
controller_idle_minutes_to_autostop, controller_down = (
controller_utils.get_controller_autostop_config(
controller=controller_utils.Controllers.JOBS_CONTROLLER))
controller=controller))

vars_to_fill = {
'remote_user_yaml_path': remote_user_yaml_path,
Expand All @@ -162,7 +190,7 @@ def launch(
'dashboard_setup_cmd': managed_job_constants.DASHBOARD_SETUP_CMD,
'dashboard_user_id': common.SERVER_ID,
**controller_utils.shared_controller_vars_to_fill(
controller_utils.Controllers.JOBS_CONTROLLER,
controller,
remote_user_config_path=remote_user_config_path,
local_user_config=mutated_user_config,
),
Expand Down