Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -4441,7 +4441,7 @@ def tail_managed_job_logs(self,
def sync_down_managed_job_logs(
self,
handle: CloudVmRayResourceHandle,
job_id: Optional[int] = None,
job_id: Optional[Union[int, str]] = None,
job_name: Optional[str] = None,
controller: bool = False,
local_dir: str = constants.SKY_LOGS_DIRECTORY) -> Dict[str, str]:
Expand Down Expand Up @@ -4516,7 +4516,7 @@ def sync_down_managed_job_logs(
run_timestamps = {
job_id: f'managed-jobs-consolidation-mode-{job_id}'
}
else:
elif isinstance(job_id, int):
# get the run_timestamp
# the function takes in [job_id]
use_legacy = not handle.is_grpc_enabled_with_flag
Expand Down Expand Up @@ -4552,6 +4552,9 @@ def sync_down_managed_job_logs(
# returns with a dict of {job_id: run_timestamp}
run_timestamps = message_utils.decode_payload(
run_timestamps_payload)
else:
run_timestamps = {job_id: f'process-{job_id}'}

if not run_timestamps:
logger.info(f'{colorama.Fore.YELLOW}'
'No matching log directories found'
Expand Down
29 changes: 27 additions & 2 deletions sky/client/cli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import typing
from typing import (Any, Callable, Dict, Generator, List, Optional, Set, Tuple,
TypeVar, Union)
import uuid

import click
import colorama
Expand Down Expand Up @@ -5240,11 +5241,35 @@ def jobs_cancel(
is_flag=True,
required=False,
help='Download logs for all jobs shown in the queue.')
@click.argument('job_id', required=False, type=int)
@click.argument('job_id', required=False, type=str)
@usage_lib.entrypoint
def jobs_logs(name: Optional[str], job_id: Optional[int], follow: bool,
def jobs_logs(name: Optional[str], job_id: Optional[str], follow: bool,
controller: bool, refresh: bool, sync_down: bool):
"""Tail or sync down the log of a managed job."""
job_id: Optional[Union[int, str]] = job_id # keep mypy happy

if controller and job_id is not None:
try:
if isinstance(job_id, str) and job_id.isdigit():
job_id = int(job_id)
elif isinstance(job_id, str):
parsed_uuid = uuid.UUID(job_id)
job_id = f'controller_{parsed_uuid}'
except (ValueError, TypeError):
# keep same typeerror message as before - just adds "or UUID" to
# the error message
raise click.UsageError( # pylint: disable=raise-missing-from
f'Error: Invalid value for \'[JOB_ID]\': \'{job_id}\' is not '
'a valid integer or UUID.')
elif job_id is not None:
try:
job_id = int(job_id)
except (ValueError, TypeError):
# same error message as before in the non controller case
raise click.UsageError( # pylint: disable=raise-missing-from
f'Error: Invalid value for \'[JOB_ID]\': \'{job_id}\' is not '
'a valid integer.')

try:
if sync_down:
with rich_utils.client_status(
Expand Down
12 changes: 11 additions & 1 deletion sky/dashboard/src/pages/jobs/[job].js
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,17 @@ function JobDetailsContent({
const RECOVERING_STATUSES = ['RECOVERING'];

const isPending = PENDING_STATUSES.includes(jobData.status);
const isPreStart = PRE_START_STATUSES.includes(jobData.status);
// Check if controller is running: either controller_pid is set, or schedule_state
// indicates the controller is alive (not INACTIVE or WAITING)
// After PR #5682, a job can be in PENDING state even though the controller is running
const isControllerRunning =
jobData.controller_pid != null ||
(jobData.schedule_state &&
jobData.schedule_state !== 'INACTIVE' &&
jobData.schedule_state !== 'WAITING');
// Controller is not started if job is in pre-start status AND controller is not running
const isPreStart =
PRE_START_STATUSES.includes(jobData.status) && !isControllerRunning;
const isRecovering = RECOVERING_STATUSES.includes(jobData.status);

const toggleYamlExpanded = () => {
Expand Down
16 changes: 9 additions & 7 deletions sky/jobs/client/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ def cancel(
@server_common.check_server_healthy_or_start
@rest.retry_transient_errors()
def tail_logs(name: Optional[str] = None,
job_id: Optional[int] = None,
job_id: Optional[Union[int, str]] = None,
follow: bool = True,
controller: bool = False,
refresh: bool = False,
Expand Down Expand Up @@ -386,11 +386,12 @@ def tail_logs(name: Optional[str] = None,
@usage_lib.entrypoint
@server_common.check_server_healthy_or_start
def download_logs(
name: Optional[str],
job_id: Optional[int],
refresh: bool,
controller: bool,
local_dir: str = constants.SKY_LOGS_DIRECTORY) -> Dict[int, str]:
name: Optional[str],
job_id: Optional[Union[int, str]],
refresh: bool,
controller: bool,
local_dir: str = constants.SKY_LOGS_DIRECTORY
) -> Dict[Union[int, str], str]:
"""Sync down logs of managed jobs.

Please refer to sky.cli.job_logs for documentation.
Expand Down Expand Up @@ -428,7 +429,8 @@ def download_logs(
remote2local_path_dict = client_common.download_logs_from_api_server(
job_id_remote_path_dict.values())
return {
int(job_id): remote2local_path_dict[remote_path]
int(job_id) if job_id.isdigit() else job_id:
remote2local_path_dict[remote_path]
for job_id, remote_path in job_id_remote_path_dict.items()
}

Expand Down
11 changes: 6 additions & 5 deletions sky/jobs/client/sdk_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,12 @@ async def tail_logs(cluster_name: str,

@usage_lib.entrypoint
async def download_logs(
name: Optional[str],
job_id: Optional[int],
refresh: bool,
controller: bool,
local_dir: str = constants.SKY_LOGS_DIRECTORY) -> Dict[int, str]:
name: Optional[str],
job_id: Optional[Union[int, str]],
refresh: bool,
controller: bool,
local_dir: str = constants.SKY_LOGS_DIRECTORY
) -> Dict[Union[int, str], str]:
"""Async version of download_logs() that syncs down logs of managed jobs."""
return await context_utils.to_thread(sdk.download_logs, name, job_id,
refresh, controller, local_dir)
Expand Down
2 changes: 1 addition & 1 deletion sky/jobs/server/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,7 @@ def tail_logs(name: Optional[str],
@usage_lib.entrypoint
def download_logs(
name: Optional[str],
job_id: Optional[int],
job_id: Optional[Union[int, str]],
refresh: bool,
controller: bool,
local_dir: str = skylet_constants.SKY_LOGS_DIRECTORY) -> Dict[str, str]:
Expand Down
88 changes: 86 additions & 2 deletions sky/jobs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,14 +909,86 @@ def cancel_jobs_by_pool(pool_name: str,
return cancel_jobs_by_id(job_ids, current_workspace=current_workspace)


def controller_log_file_for_job(job_id: int,
def controller_log_file_for_job(job_id: Union[int, str],
create_if_not_exists: bool = False) -> str:
log_dir = os.path.expanduser(managed_job_constants.JOBS_CONTROLLER_LOGS_DIR)
if create_if_not_exists:
os.makedirs(log_dir, exist_ok=True)
return os.path.join(log_dir, f'{job_id}.log')


def find_controller_pid_by_uuid(uuid: str) -> Optional[int]:
"""Find the PID of a controller process with the given UUID.

This is equivalent to: ps -ax | grep controller | grep <uuid>

Args:
uuid: The UUID of the controller process to find.

Returns:
The PID of the controller process if found, None otherwise.
"""
try:
for proc in psutil.process_iter(['pid', 'cmdline']):
try:
cmdline = proc.cmdline()
if not cmdline:
continue
cmd_str = ' '.join(cmdline)
# Check if this is a controller process with the given UUID
if ('controller' in cmd_str.lower() and uuid in cmd_str):
return proc.pid
except (psutil.NoSuchProcess, psutil.AccessDenied,
psutil.ZombieProcess):
# Process may have terminated or we don't have access
continue
except Exception: # pylint: disable=broad-except
pass

return None


def stream_controller_logs(controller_uuid: str,
follow: bool = True) -> Tuple[str, int]:
"""Stream controller logs by job id."""
assert controller_uuid.startswith('controller_')

controller_log_path = controller_log_file_for_job(controller_uuid)
if not os.path.exists(controller_log_path):
with ux_utils.print_exception_no_traceback():
raise ValueError(
f'Controller log file {controller_log_path} not found.')

# command was started with this pid
pid = find_controller_pid_by_uuid(controller_uuid)

with open(controller_log_path, 'r', encoding='utf-8') as f:
for line in f:
print(line, end='', flush=True)

if follow:
while True:
# Print all new lines, if there are any.
line = f.readline()
while line is not None and line != '':
print(line, end='')
line = f.readline()

# Flush.
print(end='', flush=True)

# Check if the controller process is still running.
if not psutil.pid_exists(pid):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The pid variable can be None if find_controller_pid_by_uuid does not find a running process. In this case, psutil.pid_exists(pid) will raise a TypeError because it expects an integer. This will cause the log streaming to crash. You should handle the case where pid is None.

Suggested change
if not psutil.pid_exists(pid):
if pid is None or not psutil.pid_exists(pid):

break

time.sleep(log_lib.SKY_LOG_TAILING_GAP_SECONDS)

# Wait for final logs to be written.
time.sleep(1 + log_lib.SKY_LOG_TAILING_GAP_SECONDS)

return '', exceptions.JobExitCode.SUCCEEDED


def stream_logs_by_id(job_id: int,
follow: bool = True,
tail: Optional[int] = None) -> Tuple[str, int]:
Expand Down Expand Up @@ -1205,7 +1277,7 @@ def is_managed_job_status_updated(
managed_job_status)


def stream_logs(job_id: Optional[int],
def stream_logs(job_id: Optional[Union[int, str]],
job_name: Optional[str],
controller: bool = False,
follow: bool = True,
Expand All @@ -1217,11 +1289,23 @@ def stream_logs(job_id: Optional[int],
or failure of the job. 0 if success, 100 if the job failed.
See exceptions.JobExitCode for possible exit codes.
"""

if isinstance(job_id, str):
assert controller

if job_id.startswith('controller_'):
return stream_controller_logs(job_id, follow)
else:
return '', exceptions.JobExitCode.NOT_FOUND

if job_id is None and job_name is None:
job_id = managed_job_state.get_latest_job_id()
if job_id is None:
return 'No managed job found.', exceptions.JobExitCode.NOT_FOUND

# due to checking if job_id is a string, we can safely assume its an
# optional int
job_id: Optional[int] = job_id # type: ignore
if controller:
if job_id is None:
assert job_name is not None
Expand Down
4 changes: 2 additions & 2 deletions sky/server/requests/payloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ class JobsCancelBody(RequestBody):
class JobsLogsBody(RequestBody):
"""The request body for the jobs logs endpoint."""
name: Optional[str] = None
job_id: Optional[int] = None
job_id: Optional[Union[int, str]] = None
follow: bool = True
controller: bool = False
refresh: bool = False
Expand Down Expand Up @@ -737,7 +737,7 @@ class StreamBody(pydantic.BaseModel):
class JobsDownloadLogsBody(RequestBody):
"""The request body for the jobs download logs endpoint."""
name: Optional[str]
job_id: Optional[int]
job_id: Optional[Union[int, str]]
refresh: bool = False
controller: bool = False
local_dir: str = constants.SKY_LOGS_DIRECTORY
Expand Down
Loading