Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 20 additions & 42 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from sky.clouds.utils import gcp_utils
from sky.dag import DEFAULT_EXECUTION
from sky.data import data_utils
from sky.data import mounting_utils
from sky.data import storage as storage_lib
from sky.provision import common as provision_common
from sky.provision import constants as provision_constants
Expand Down Expand Up @@ -3901,7 +3902,15 @@ def _sync_file_mounts(
controller_utils.replace_skypilot_config_path_in_file_mounts(
launched_resources.cloud, all_file_mounts)
self._execute_file_mounts(handle, all_file_mounts)
self._execute_storage_mounts(handle, storage_mounts)
# Storage MOUNT may already have been done at provision time (e.g.
# Slurm mounts from the persistent batch job so the FUSE daemon
# survives proctrack/cgroup). Metadata is still recorded below --
# it is a local DB write that `sky start` relies on.
if handle.provision_runtime_metadata.storage_mounts_synced:
logger.info('Skipping storage mounts: provisioner reported '
'they are already mounted.')
else:
self._execute_storage_mounts(handle, storage_mounts)
self._set_storage_mounts_metadata(handle.cluster_name,
storage_mounts)

Expand Down Expand Up @@ -6378,60 +6387,29 @@ def _execute_storage_mounts(
if storage_mounts is None:
return

# Process only mount mode objects here. COPY mode objects have been
# converted to regular copy file mounts and thus have been handled
# in the '_execute_file_mounts' method.
storage_mounts = {
path: storage_mount
for path, storage_mount in storage_mounts.items()
if storage_mount.mode in storage_lib.MOUNTABLE_STORAGE_MODES
}
# Construct each mountable store and build its mount command (COPY-mode
# storages are skipped here -- they are handled as regular file mounts
# in '_execute_file_mounts'). Shared with the Slurm provision-time
# template_override so the two paths stay in sync.
mount_specs = mounting_utils.resolve_mount_commands(
storage_mounts, cluster_name=handle.cluster_name)

# Handle cases when there aren't any Storages with either MOUNT or
# MOUNT_CACHED mode.
if not storage_mounts:
if not mount_specs:
return
start = time.time()
runners = handle.get_command_runners()
num_threads = subprocess_utils.get_parallel_threads(
str(handle.launched_resources.cloud))
log_path = os.path.join(self.log_dir, 'storage_mounts.log')

plural = 's' if len(storage_mounts) > 1 else ''
plural = 's' if len(mount_specs) > 1 else ''
rich_utils.force_update_status(
ux_utils.spinner_message(
f'Mounting {len(storage_mounts)} storage{plural}', log_path))
f'Mounting {len(mount_specs)} storage{plural}', log_path))

for dst, storage_obj in storage_mounts.items():
storage_obj.construct()
if not os.path.isabs(dst) and not dst.startswith('~/'):
dst = f'{SKY_REMOTE_WORKDIR}/{dst}'
# Raised when the bucket is externall removed before re-mounting
# with sky start.
if not storage_obj.stores:
with ux_utils.print_exception_no_traceback():
raise exceptions.StorageExternalDeletionError(
f'The bucket, {storage_obj.name!r}, could not be '
f'mounted on cluster {handle.cluster_name!r}. Please '
'verify that the bucket exists. The cluster started '
'successfully without mounting the bucket.')
# Get the first store and use it to mount
store = list(storage_obj.stores.values())[0]
assert store is not None, storage_obj
if storage_obj.mode == storage_lib.StorageMode.MOUNT:
read_only = bool(storage_obj.mount_config and
storage_obj.mount_config.read_only)
mount_cmd = store.mount_command(dst, read_only=read_only)
action_message = 'Mounting'
else:
assert storage_obj.mode == storage_lib.StorageMode.MOUNT_CACHED
mount_cmd = store.mount_cached_command(
dst, config=storage_obj.resolve_mount_cached_config())
action_message = 'Mounting cached mode'
src_print = (storage_obj.source
if storage_obj.source else storage_obj.name)
if isinstance(src_print, list):
src_print = ', '.join(src_print)
for dst, mount_cmd, action_message, src_print in mount_specs:
try:
backend_utils.parallel_data_transfer_to_nodes(
runners,
Expand Down
82 changes: 74 additions & 8 deletions sky/data/mounting_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Helper functions for object store mounting in Sky Storage"""
import hashlib
import os
import random
import shlex
import textwrap
import typing
Expand Down Expand Up @@ -957,6 +956,65 @@ def get_mounting_script(
return script


def resolve_mount_commands(
storage_mounts: Optional[typing.Dict[str, 'storage.Storage']],
cluster_name: Optional[str] = None,
) -> typing.List[typing.Tuple[str, str, str, Optional[str]]]:
"""Builds the mount command for each MOUNT / MOUNT_CACHED storage.

For each mountable storage, this constructs the store (which validates /
materializes the bucket) and generates its mount command, returning a list
of ``(dst, mount_cmd, action_message, src_print)`` tuples (COPY-mode
storages are skipped — they are handled as regular file mounts).

NOTE: ``storage_obj.construct()`` has cloud-side effects (bucket existence
checks / creation), so call this on the API server, never inside a remote
subprocess. Shared by the runtime path
(``CloudVmRayBackend._execute_storage_mounts``) and the Slurm
provision-time ``template_override``, so the two stay in sync.

Raises:
exceptions.StorageExternalDeletionError: if a bucket no longer exists.
"""
# pylint: disable=import-outside-toplevel
from sky.data import storage as storage_lib

specs: typing.List[typing.Tuple[str, str, str, Optional[str]]] = []
if not storage_mounts:
return specs
on_cluster = f' on cluster {cluster_name!r}' if cluster_name else ''
for dst, storage_obj in storage_mounts.items():
if storage_obj.mode not in storage_lib.MOUNTABLE_STORAGE_MODES:
continue
storage_obj.construct()
if not os.path.isabs(dst) and not dst.startswith('~/'):
dst = f'{constants.SKY_REMOTE_WORKDIR}/{dst}'
# Raised when the bucket is externally removed before (re-)mounting.
if not storage_obj.stores:
raise exceptions.StorageExternalDeletionError(
f'The bucket, {storage_obj.name!r}, could not be mounted'
f'{on_cluster}. Please verify that the bucket exists.')
# Get the first store and use it to mount.
store = list(storage_obj.stores.values())[0]
assert store is not None, storage_obj
if storage_obj.mode == storage_lib.StorageMode.MOUNT:
read_only = bool(storage_obj.mount_config and
storage_obj.mount_config.read_only)
mount_cmd = store.mount_command(dst, read_only=read_only)
action_message = 'Mounting'
else:
assert storage_obj.mode == storage_lib.StorageMode.MOUNT_CACHED
mount_cmd = store.mount_cached_command(
dst, config=storage_obj.resolve_mount_cached_config())
action_message = 'Mounting cached mode'
src_print = (storage_obj.source
if storage_obj.source else storage_obj.name)
if isinstance(src_print, list):
src_print = ', '.join(src_print)
specs.append((dst, mount_cmd, action_message, src_print))
return specs


def get_mounting_command(
mount_path: str,
install_cmd: str,
Expand All @@ -983,11 +1041,19 @@ def get_mounting_command(
script = get_mounting_script(mount_path, mount_cmd, install_cmd,
version_check_cmd)

# While these commands are run sequentially for each storage object,
# we add random int to be on the safer side and avoid collisions.
script_path = f'~/.sky/mount_{random.randint(0, 1000000)}.sh'
command = (f'echo {shlex.quote(script)} > {script_path} && '
f'chmod +x {script_path} && '
f'bash {script_path} && '
f'rm {script_path}')
# Write the script to a unique temp file (via mktemp) and execute it.
# mktemp -- rather than a Python-baked random name -- keeps the path
# unique *per process*, which matters when the SAME generated command runs
# concurrently on multiple nodes that share a filesystem (e.g. Slurm
# provision-time mounting from one `srun --nodes=N` step, where ~ is the
# shared cluster home). A baked name collides across those nodes: they race
# on the same file and the trailing `rm` fails on all but one, which (under
# `set -e`) kills those tasks and their freshly-spawned FUSE daemons.
# Falls back to TMPDIR if ~/.sky is unavailable.
command = ('mount_script=$(mktemp ~/.sky/mount_XXXXXX.sh 2>/dev/null || '
'mktemp -t sky_mount_XXXXXX.sh) && '
f'echo {shlex.quote(script)} > "$mount_script" && '
'chmod +x "$mount_script" && '
'bash "$mount_script" && '
'rm "$mount_script"')
Comment on lines +1053 to +1058

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If the mounting script fails or is interrupted, the temporary script file created by mktemp will not be deleted because the rm command is chained with && and will be skipped. This can lead to leftover temporary files in ~/.sky or /tmp.

Using a subshell with an EXIT trap guarantees that the temporary file is cleaned up under all exit conditions (success, failure, or interruption) while correctly preserving the exit status of the mounting script.

Suggested change
command = ('mount_script=$(mktemp ~/.sky/mount_XXXXXX.sh 2>/dev/null || '
'mktemp -t sky_mount_XXXXXX.sh) && '
f'echo {shlex.quote(script)} > "$mount_script" && '
'chmod +x "$mount_script" && '
'bash "$mount_script" && '
'rm "$mount_script"')
command = (
'('
'mount_script=$(mktemp ~/.sky/mount_XXXXXX.sh 2>/dev/null || mktemp -t sky_mount_XXXXXX.sh) && '
'trap \'rm -f "$mount_script"\' EXIT && '
f'echo {shlex.quote(script)} > "$mount_script" && '
'chmod +x "$mount_script" && '
'bash "$mount_script"'
')'
)

return command
10 changes: 10 additions & 0 deletions sky/provision/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,16 @@ def cleanup_custom_multi_network(
raise NotImplementedError


# Register built-in provisioners that expose a template_override hook. Done
# here (not in the submodule) because submodules are imported above before
# register_provisioner is defined. Plugins register their own via install().
register_provisioner(
'slurm',
module=slurm,
template_override=slurm.template_override,
)


@_route_to_cloud_impl
def open_ports(
provider_name: str,
Expand Down
6 changes: 6 additions & 0 deletions sky/provision/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ class ProvisionRuntimeMetadata:
# Whether the task's file_mounts have already been synced to the
# cluster by the provisioner.
file_mounts_synced: bool = False
# Whether the task's MOUNT-mode storages have already been mounted on the
# cluster by the provisioner. Used by Slurm, which mounts at provision time
# from the persistent batch job so the FUSE daemon survives proctrack/
# cgroup step teardown. Narrower than ``file_mounts_synced``: COPY-mode
# file_mounts still sync at runtime.
storage_mounts_synced: bool = False
# Whether the user's ``setup`` commands have already been run on the
# cluster by the provisioner.
setup_done: bool = False
Expand Down
5 changes: 5 additions & 0 deletions sky/provision/slurm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,10 @@
from sky.provision.slurm.instance import query_instances
from sky.provision.slurm.instance import run_instances
from sky.provision.slurm.instance import stop_instances
from sky.provision.slurm.instance import template_override
from sky.provision.slurm.instance import terminate_instances
from sky.provision.slurm.instance import wait_instances

# Note: template_override is registered in sky/provision/__init__.py (after
# register_provisioner is defined), since this module is imported there before
# that definition exists.
Loading